In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [2]:
spark = (SparkSession.builder.appName("Sales Challenge").getOrCreate())
#spark.conf.set('spark.sql.parquet.compression.codec', 'gzip')

## Data Preparation

In [3]:
schema = StructType([
    StructField('Order ID', StringType(), True),
    StructField('Product', StringType(), True),
    StructField('Quantity Ordered', StringType(), True),
    StructField('Price Each', StringType(), True),
    StructField('Order Date', StringType(), True),
    StructField('Purchase Address', StringType(), True)
])


In [4]:
sales_data_path = './data/salesdata/Sales_April_2019.csv'
sales_raw_df = (spark.read.format('csv')
                .option('header', True)
                .schema(schema)
                .load(sales_data_path))

In [5]:
sales_raw_df.show(20)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|    null|                null|            null|      null|          null|                null|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|       600|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|
|  176562|USB-C Charging Cable|               1|     11.95|04/29/19 13:03|381 Wilson St, Sa...|
|  176563|Bose SoundSport H...|         

In [6]:
sales_raw_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



## Reading a JSON

In [7]:
persons_schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('first_name', StringType(), True),
    StructField('last_name', StringType(), True),
    StructField('fav_movies', ArrayType(StringType()), True),
    StructField('salary', FloatType(), True),
    StructField('image_url', StringType(), True),
    StructField('date_of_birth', DateType(), True),
    StructField('active', BooleanType(), True)])


json_path = './data/person/persons.json'
persons_df = (spark.read.json(json_path, persons_schema, multiLine='True'))
persons_df.show(10, truncate=False)

+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name|fav_movies                                                   |salary |image_url                                      |date_of_birth|active|
+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy    |[I giorni contati]                                           |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|2  |Emelyne   |Blaza    |[Musketeer, The, Topralli]                                   |3006.04|http://dummyimage.com/158x106.bmp/cc0000/ffffff|1991-11-02   |false |
|3  |Max       |Rettie   |[The Forgotten Space, Make It Happen]                        |1422.88|http://dummyimage.com/237x140.jpg/ff4444/ffffff|1990-03-03   |false |
|4  

## Columns and Expressions

In [8]:
from pyspark.sql.functions import col, expr

In [9]:
persons_df.select(col('first_name'), col('last_name'), col('salary')).show(5)

+----------+---------+-------+
|first_name|last_name| salary|
+----------+---------+-------+
|     Drucy|    Poppy|1463.36|
|   Emelyne|    Blaza|3006.04|
|       Max|   Rettie|1422.88|
|    Ilario|     Kean|3561.36|
|     Toddy|   Drexel|4934.87|
+----------+---------+-------+
only showing top 5 rows



In [10]:
from pyspark.sql.functions import concat_ws

In [11]:
persons_df.select(concat_ws(' ', col('first_name'),col('last_name')).alias('full_name')).show(10)

+----------------+
|       full_name|
+----------------+
|     Drucy Poppy|
|   Emelyne Blaza|
|      Max Rettie|
|     Ilario Kean|
|    Toddy Drexel|
| Oswald Petrolli|
|   Adrian Clarey|
|Dominica Goodnow|
|   Emory Slocomb|
|   Jeremias Bode|
+----------------+
only showing top 10 rows



In [12]:
persons_df.select(concat_ws(' ', col('first_name'), col('last_name')).alias('full_name'),
                  col('salary'),
                  (col('salary') * .10 + col('salary')).alias('salary increase')).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36|1609.6959838867188|
|   Emelyne Blaza|3006.04|  3306.64404296875|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36|3917.4961181640624|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|  3819.89287109375|
+----------------+-------+------------------+
only showing top 10 rows



In [13]:
# Using expr allows ('salary * .10 + salary') vs using col('salary') * .10 + col('salary') -- both have the same result

persons_df.select(concat_ws(' ', col('first_name'), col('last_name')).alias('full_name'),
                  col('salary'),
                  (expr('salary * .10 + salary')).alias('salary increase')).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36|1609.6959838867188|
|   Emelyne Blaza|3006.04|  3306.64404296875|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36|3917.4961181640624|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|  3819.89287109375|
+----------------+-------+------------------+
only showing top 10 rows



## Filter and where condition -- both can be used to do the same thing

In [14]:
persons_df.filter('salary <= 3000').show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|1153.23|http://dummyimage...|   1986-09-02| false|
|  7|    Adrian|     Clarey|[Walking Tall, Pa...|1044.73|http://dummyimage...|   1971-08-24| false|
|  8|  Dominica|    Goodnow|    [Hearts Divided]|1147.76|http://dummyimage...|   1973-08-27| false|
|  9|     Emory|    Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 11|   Timothy|     Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|


In [15]:
persons_df.where((col('salary') <= 3000) & (col('active') == True)).show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  9|     Emory|  Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 16|   Margaux| Archbold|[And Now a Word f...|1013.75|http://dummyimage...|   1988-07-29|  true|
| 26|     Clive|      Lax|             [Rabid]|2126.87|http://dummyimage...|   1981-10-26|  true|
| 33|  Sherline|  Primett|   [Jungle Fighters]|2309.39|http://dummyimage...|   1972-07-23|  true|
| 34|     Davis|    Pinks|          [Hounddog]|1337.14|http://dummyimage...|   1989-07-27|  true|
| 37|    Carlen|  Sharply|[Dr. Jekyll and M...|2051.85|http://dummyimage...|   2002-06-01|  true|
| 40|    Jordan|   L

In [16]:
from pyspark.sql.functions import year

In [17]:
persons_df.filter((year("date_of_birth") == 2000) | (year("date_of_birth") == 1989)).show()

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| 14|   Ambrosi|   Vidineev|[Wall Street: Mon...|4550.88|http://dummyimage...|   1989-07-20|  true|
| 15|    Feodor|Nancekivell|   [Monsoon Wedding]|2218.46|http://dummyimage...|   2000-10-07| false|
| 18|     Alfie|   Hatliffe|     [Lord of Tears]| 3893.1|http://dummyimage...|   1989-06-21|  true|
| 25|     Kelcy|     Wogdon|    [Iron Mask, The]|4512.51|http://dummyimage...|   2000-10-20|  true|
| 32|      Redd|   Akenhead|[Century of the D...| 2470.9|http://dummyimage...|   2000-06-05| false|
| 34|     Davis|      Pinks|          [Hounddog]|1337.14|http://dummyimage...|   1989-07-27|  true|
| 61|    Shanna|    Samples|[Thomas in Love (...| 2703.0|http://dummyimage...|   1989-07-07| false|


In [18]:
from pyspark.sql.functions import array_contains

persons_df.where(array_contains(persons_df.fav_movies, "Land of the Lost")).show()

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| 11|   Timothy|   Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+



## Distinct, Drop Duplicates, Order By

In [19]:
from pyspark.sql.functions import count, desc

persons_df.select("active").show(10)

+------+
|active|
+------+
|  true|
| false|
| false|
|  true|
|  true|
| false|
| false|
| false|
|  true|
|  true|
+------+
only showing top 10 rows



In [20]:
persons_df.select("active").distinct().show()

+------+
|active|
+------+
|  true|
| false|
+------+



In [21]:
persons_df.select(col("first_name"), year(col("date_of_birth")).alias("year"),
                                          col("active")).orderBy('year', 'first_name').show(10)

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|       Sky|1971| false|
|   Timothy|1971| false|
|    Lucita|1972|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|     Toddy|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
+----------+----+------+
only showing top 10 rows



In [22]:
dropped_df = (persons_df.select(col('first_name'),
              year(col('date_of_birth')).alias('year'),
              col('active')).dropDuplicates(['year','active'])).orderBy('year','first_name')                                                            

In [23]:
dropped_df.show()

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
|   Balduin|1974| false|
|     Emory|1974|  true|
|    Janean|1975|  true|
|       Bev|1976|  true|
| Franciska|1976| false|
|     Johny|1977| false|
|    Daveta|1978| false|
|   Guthrie|1978|  true|
|      Maxi|1979| false|
|   Melinda|1979|  true|
|    Carter|1980| false|
|   Loralyn|1980|  true|
|     Clive|1981|  true|
|   Leanora|1981| false|
+----------+----+------+
only showing top 20 rows



In [24]:
persons_df.select(col("first_name"), 
                  year(col("date_of_birth")).alias("year"),
                  col("active")).orderBy('year', ascending = False).show(10)

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|     Daron|2002|  true|
|    Virgie|2002|  true|
|    Carlen|2002|  true|
|   Lorilee|2002| false|
|    Maxine|2001| false|
|    Feodor|2000| false|
|     Kelcy|2000|  true|
|  Annabell|2000|  true|
|      Redd|2000| false|
|     Jobie|2000| false|
+----------+----+------+
only showing top 10 rows



## Rows and Union

In [25]:
from pyspark.sql import Row

In [26]:
person_row = Row(101, 'Robert', 'Owens', ['Men in Black III', 'Home Alone'], 4300.64,
                 'http://someimage.com', '1964-09-18', True)
persons_row_list = [Row(101, 'Kenny', 'Bobien', ['Men in Black III', 'Home Alone'], 4300.64,
                 'http://someimage.com', '1964-09-18', True),
                  Row(101, 'Sara', 'Devine', ['Men in Black III', 'Home Alone'], 4300.64,
                 'http://someimage.com', '1964-09-18', True)]                                      

In [27]:
persons_row_list.append(person_row)

In [28]:
print(persons_row_list)

[<Row(101, 'Kenny', 'Bobien', ['Men in Black III', 'Home Alone'], 4300.64, 'http://someimage.com', '1964-09-18', True)>, <Row(101, 'Sara', 'Devine', ['Men in Black III', 'Home Alone'], 4300.64, 'http://someimage.com', '1964-09-18', True)>, <Row(101, 'Robert', 'Owens', ['Men in Black III', 'Home Alone'], 4300.64, 'http://someimage.com', '1964-09-18', True)>]


In [29]:
person_row[1]

'Robert'

In [30]:
new_persons_df = spark.createDataFrame(persons_row_list,
                                       ['id', 'first_name', 'fav_movies', 'salary',
                                        'image_url', 'date_of_birth', 'active'])
                                                          

In [31]:
add_person_df = persons_df.union(new_persons_df)
add_person_df.sort(desc('id')).show(10)

+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies|            salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
|101|      Sara|   Devine|[Men in Black III...|           4300.64|http://someimage.com|   1964-09-18|  true|
|101|     Kenny|   Bobien|[Men in Black III...|           4300.64|http://someimage.com|   1964-09-18|  true|
|101|    Robert|    Owens|[Men in Black III...|           4300.64|http://someimage.com|   1964-09-18|  true|
|100|    Virgie| Domanski|[Horseman, The, S...| 2165.929931640625|http://dummyimage...|   2002-01-05|  true|
| 99|   Rozalie|   Wannop|[Suddenly, The No...|1259.6400146484375|http://dummyimage...|   1997-03-25| false|
| 98|     Davin|     Labb|[Viva Riva!, Kill...| 1452.739990234375|http://dummyimage...|   1988-01-27|  true|
| 97|      Rodi|   

## Adding, Renaming, and Dropping Columns

In [32]:
from pyspark.sql.functions import round

In [33]:
aug_persons_df1 = persons_df.withColumn('salary_increase', expr('salary * .10 + salary'))
aug_persons_df1.show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+------------------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|   salary_increase|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+------------------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|1609.6959838867188|
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|  3306.64404296875|
|  3|       Max|   Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|1565.1680053710938|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|3917.4961181640624|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|  5428.35712890625|
|  6|    Oswald| Petrolli|[Wing and the Thi...|1153.23|http://du

In [34]:
aug_persons_df1.columns

['id',
 'first_name',
 'last_name',
 'fav_movies',
 'salary',
 'image_url',
 'date_of_birth',
 'active',
 'salary_increase']

In [35]:
aug_persons_df2 = (aug_persons_df1
                   .withColumn('birth_year', year('date_of_birth'))
                   .withColumnRenamed('fav_movies', 'movies')
                   .withColumn('salary_x10', round(col('salary_increase'), 2))
                   .drop('salary_increase'))

In [36]:
aug_persons_df2.show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+----------+----------+
| id|first_name|last_name|              movies| salary|           image_url|date_of_birth|active|birth_year|salary_x10|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+----------+----------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|      1991|    1609.7|
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|      1991|   3306.64|
|  3|       Max|   Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|      1990|   1565.17|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|      1987|    3917.5|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|      1992|   5428.36|
|  6|    Oswald| Petrolli|[Wing and the 

## Working with missing or bad data

In [37]:
bad_movies_list = [Row(None, None, None),
                   Row(None, None, 2020),
                   Row("John Doe", "Awesome Movie", None),
                   Row(None, "Awesome Movie", 2021),
                   Row("Mary Jane", None, 2019),
                   Row("Vikter Duplaix", "Not another teen movie", 2001)]

bad_movies_columns = ['actor_name', 'movie_title', 'produced_year']

bad_movies_df = spark.createDataFrame(bad_movies_list, bad_movies_columns)
bad_movies_df.show(10)

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|          null|                null|         null|
|          null|                null|         2020|
|      John Doe|       Awesome Movie|         null|
|          null|       Awesome Movie|         2021|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [38]:
bad_movies_df.na.drop().show() # Drops every row that has a null value

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [39]:
bad_movies_df.na.drop('any').show() # does the same thing as above

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [40]:
bad_movies_df.na.drop('all').show() # drops rows where every value in the row is null

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|          null|                null|         2020|
|      John Doe|       Awesome Movie|         null|
|          null|       Awesome Movie|         2021|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [41]:
bad_movies_df.filter(col('actor_name').isNull() != True).show() # drops all rows with null for actor_name

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|      John Doe|       Awesome Movie|         null|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [42]:
bad_movies_df.filter(col('actor_name').isNull() != False).show() # drops all rows with a value for actor

+----------+-------------+-------------+
|actor_name|  movie_title|produced_year|
+----------+-------------+-------------+
|      null|         null|         null|
|      null|         null|         2020|
|      null|Awesome Movie|         2021|
+----------+-------------+-------------+



In [43]:
bad_movies_df.describe('produced_year').show()

+-------+-----------------+
|summary|    produced_year|
+-------+-----------------+
|  count|                4|
|   mean|          2015.25|
| stddev|9.535023160258536|
|    min|             2001|
|    max|             2021|
+-------+-----------------+



## Working with user defined functions -- try to avoid using user defined functions

In [44]:
from pyspark.sql.functions import udf
grades = [('John', 90), ('George', 60), ('Mary', 80)]

In [45]:
grades_columns = ['names', 'scores']

In [46]:
grades_df = spark.createDataFrame(grades, schema=grades_columns)
grades_df.show()

+------+------+
| names|scores|
+------+------+
|  John|    90|
|George|    60|
|  Mary|    80|
+------+------+



In [47]:
def letter_grade(grade_num):
    grade = ''
    if grade_num >= 90:
        grade = 'A'
    elif grade_num >= 80:
        grade = 'B'
    elif grade_num >= 70:
        grade = 'C'
    elif grade_num < 70:
        grade = 'F'
    return grade

In [48]:
letter_gradeudf = udf(letter_grade)

In [49]:
grades_df.select('names', 'scores', letter_gradeudf(col('scores')).alias('grade')).show()


+------+------+-----+
| names|scores|grade|
+------+------+-----+
|  John|    90|    A|
|George|    60|    F|
|  Mary|    80|    B|
+------+------+-----+



## Sales Challenge Part 2

## Getting the sales data and reading it into sales_df

In [50]:
path_to_sales = ['data/salesdata/Sales_April_2019.csv',
                'data/salesdata/Sales_August_2019.csv',
                'data/salesdata/Sales_December_2019.csv',
                'data/salesdata/Sales_February_2019.csv',
                'data/salesdata/Sales_January_2019.csv',
                'data/salesdata/Sales_July_2019.csv',
                'data/salesdata/Sales_June_2019.csv',
                'data/salesdata/Sales_March_2019.csv',
                'data/salesdata/Sales_May_2019.csv',
                'data/salesdata/Sales_November_2019.csv',
                'data/salesdata/Sales_October_2019.csv',
                'data/salesdata/Sales_September_2019.csv']

sales_df = spark.read.options(header=True).csv(path_to_sales)

## Dropping the rows with all null values

In [51]:
sales_df = sales_df.na.drop()
sales_df.filter(col('Product').isNull() == True).count()

0

## Filtering out the bad rows and setting the city and state columns

In [52]:

from pyspark.sql.functions import split, substring
sales_df = sales_df.filter(col('Order ID') != 'Order ID')

sales_df = sales_df.withColumn('City', split(col('Purchase Address'), ',').getItem(1))
sales_df = sales_df.withColumn('State', split(col('Purchase Address'), ',').getItem(2))
sales_df.show()

+--------+--------------------+----------------+----------+--------------+--------------------+--------------+---------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|          City|    State|
+--------+--------------------+----------------+----------+--------------+--------------------+--------------+---------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...| New York City| NY 10001|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...| New York City| NY 10001|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...| New York City| NY 10001|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...| San Francisco| CA 94016|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|       Atlanta| GA 30301|
|  295670|AA Batteries (4-p...| 

## Setting the state column equal to state abbr. and renaming columns

In [53]:
sales_df = (sales_df.withColumn('State', substring(sales_df.State, 2, 3))
            .withColumn('City', substring(sales_df.City, 2, 20))
            .withColumnRenamed('Order ID', 'OrderID')
            .withColumnRenamed('Quantity Ordered', 'Quantity')
            .withColumnRenamed('Order Date', 'OrderDate')
            .withColumnRenamed('Purchase Address', 'StoreAddress')
            .withColumnRenamed('Price Each', 'Price'))

sales_df.show()
#df_states = df_states.withColumn('states_Name', trim(df_states.state_name))

+-------+--------------------+--------+------+--------------+--------------------+-------------+-----+
|OrderID|             Product|Quantity| Price|     OrderDate|        StoreAddress|         City|State|
+-------+--------------------+--------+------+--------------+--------------------+-------------+-----+
| 295665|  Macbook Pro Laptop|       1|  1700|12/30/19 00:01|136 Church St, Ne...|New York City|  NY |
| 295666|  LG Washing Machine|       1| 600.0|12/29/19 07:03|562 2nd St, New Y...|New York City|  NY |
| 295667|USB-C Charging Cable|       1| 11.95|12/12/19 18:21|277 Main St, New ...|New York City|  NY |
| 295668|    27in FHD Monitor|       1|149.99|12/22/19 15:13|410 6th St, San F...|San Francisco|  CA |
| 295669|USB-C Charging Cable|       1| 11.95|12/18/19 12:38|43 Hill St, Atlan...|      Atlanta|  GA |
| 295670|AA Batteries (4-p...|       1|  3.84|12/31/19 22:58|200 Jefferson St,...|New York City|  NY |
| 295671|USB-C Charging Cable|       1| 11.95|12/16/19 15:10|928 12th St,

In [54]:
from pyspark.sql.functions import trim, ltrim, rtrim

(sales_df.withColumn('City', trim(sales_df.City))
        .withColumn('State', trim(sales_df.State)))

sales_df.show()

+-------+--------------------+--------+------+--------------+--------------------+-------------+-----+
|OrderID|             Product|Quantity| Price|     OrderDate|        StoreAddress|         City|State|
+-------+--------------------+--------+------+--------------+--------------------+-------------+-----+
| 295665|  Macbook Pro Laptop|       1|  1700|12/30/19 00:01|136 Church St, Ne...|New York City|  NY |
| 295666|  LG Washing Machine|       1| 600.0|12/29/19 07:03|562 2nd St, New Y...|New York City|  NY |
| 295667|USB-C Charging Cable|       1| 11.95|12/12/19 18:21|277 Main St, New ...|New York City|  NY |
| 295668|    27in FHD Monitor|       1|149.99|12/22/19 15:13|410 6th St, San F...|San Francisco|  CA |
| 295669|USB-C Charging Cable|       1| 11.95|12/18/19 12:38|43 Hill St, Atlan...|      Atlanta|  GA |
| 295670|AA Batteries (4-p...|       1|  3.84|12/31/19 22:58|200 Jefferson St,...|New York City|  NY |
| 295671|USB-C Charging Cable|       1| 11.95|12/16/19 15:10|928 12th St,

## Changing the data types of columns

In [55]:
from pyspark.sql.types import StringType, BooleanType, IntegerType, FloatType
from pyspark.sql.functions import to_timestamp

sales_df = (sales_df.withColumn('OrderID', col('OrderID').cast(IntegerType()))
         .withColumn('Quantity', col('Quantity').cast(IntegerType()))
         .withColumn('Price', col('Price').cast(FloatType()))
         .withColumn('OrderDate', to_timestamp('OrderDate', 'MM/dd/yy HH:mm')))
sales_df.show()

+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+
|OrderID|             Product|Quantity| Price|          OrderDate|        StoreAddress|         City|State|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+
| 295665|  Macbook Pro Laptop|       1|1700.0|2019-12-30 00:01:00|136 Church St, Ne...|New York City|  NY |
| 295666|  LG Washing Machine|       1| 600.0|2019-12-29 07:03:00|562 2nd St, New Y...|New York City|  NY |
| 295667|USB-C Charging Cable|       1| 11.95|2019-12-12 18:21:00|277 Main St, New ...|New York City|  NY |
| 295668|    27in FHD Monitor|       1|149.99|2019-12-22 15:13:00|410 6th St, San F...|San Francisco|  CA |
| 295669|USB-C Charging Cable|       1| 11.95|2019-12-18 12:38:00|43 Hill St, Atlan...|      Atlanta|  GA |
| 295670|AA Batteries (4-p...|       1|  3.84|2019-12-31 22:58:00|200 Jefferson St,...|New York City|  NY |
| 295671|USB-C Charging Cabl

## Adding new columns ReportYear and Month

In [56]:
sales_df = (sales_df.withColumn('ReportYear', split(col('OrderDate'), '-').getItem(0))
         .withColumn('Month', split(col('OrderDate'), '-').getItem(1)))
sales_df.show()

+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----+
|OrderID|             Product|Quantity| Price|          OrderDate|        StoreAddress|         City|State|ReportYear|Month|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----+
| 295665|  Macbook Pro Laptop|       1|1700.0|2019-12-30 00:01:00|136 Church St, Ne...|New York City|  NY |      2019|   12|
| 295666|  LG Washing Machine|       1| 600.0|2019-12-29 07:03:00|562 2nd St, New Y...|New York City|  NY |      2019|   12|
| 295667|USB-C Charging Cable|       1| 11.95|2019-12-12 18:21:00|277 Main St, New ...|New York City|  NY |      2019|   12|
| 295668|    27in FHD Monitor|       1|149.99|2019-12-22 15:13:00|410 6th St, San F...|San Francisco|  CA |      2019|   12|
| 295669|USB-C Charging Cable|       1| 11.95|2019-12-18 12:38:00|43 Hill St, Atlan...|      Atlanta|  GA |      2019|   12|


## Writing final dataframe to parquet

In [57]:
sales_df.write.partitionBy('ReportYear', 'Month').mode('overwrite').parquet('sales.parquet')
#sales_df.write.partitionBy('ReportYear', 'Month').mode('overwrite').parquet('/data/output/sales.parquet')
#sales_df.write.csv('sales.csv')


#sales_df.select('OrderID', 'Month').where(col('City') == 'Dallas').show()

In [58]:
sales_df.count()

185950