# Challenge 2

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, to_date, year, month
import pyspark.sql.types as types

In [2]:
spark = (
    SparkSession.
    builder.
    appName('spark_challenge_2').
    getOrCreate()
)

In [3]:
sales_df = (
    spark.
    read.
    option('header', True).
    csv('./data/salesdata')
)

sales_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [4]:
sales_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



### Remove Bad Records

In [5]:
sales_df_clean = sales_df.filter(col("Order ID").isNull() == False)

In [6]:
sales_df_clean.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [7]:
sales_df.count()

186850

In [8]:
sales_df_clean.count()

186305

### Extract City and State from Address into New Columns

In [9]:
sales_df = (
    sales_df_clean.
    select(
        '*', 
        split(col('Purchase Address'), ', ').getItem(1).alias('City'),
        split(split(col('Purchase Address'), ', ').getItem(2), ' ').getItem(0).alias('State')
    )   
)

In [10]:
sales_df.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+-------------+-----+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|         City|State|
+--------+--------------------+----------------+----------+--------------+--------------------+-------------+-----+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|New York City|   NY|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|New York City|   NY|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|New York City|   NY|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|San Francisco|   CA|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|      Atlanta|   GA|
+--------+--------------------+----------------+----------+-------------

### Rename the columns and change the datatype of a columns

In [11]:
sales_df = (
    sales_df.
    select(
        col('Order ID').cast('int').alias('OrderID'),
        col('Product').cast('string'),
        col('Quantity Ordered').cast('int').alias('QuantityOrdered'),
        col('Price Each').cast(types.FloatType()).alias('Price'),
        to_date(col('Order Date'), 'MM/dd/yy HH:mm').alias('OrderDate'),
        col('Purchase Address').cast(types.StringType()).alias('StoreAddress'),
        col('City').cast('string'),
        col('State').cast('string')
    )
)

In [12]:
sales_df.show(5)

+-------+--------------------+---------------+------+----------+--------------------+-------------+-----+
|OrderID|             Product|QuantityOrdered| Price| OrderDate|        StoreAddress|         City|State|
+-------+--------------------+---------------+------+----------+--------------------+-------------+-----+
| 295665|  Macbook Pro Laptop|              1|1700.0|2019-12-30|136 Church St, Ne...|New York City|   NY|
| 295666|  LG Washing Machine|              1| 600.0|2019-12-29|562 2nd St, New Y...|New York City|   NY|
| 295667|USB-C Charging Cable|              1| 11.95|2019-12-12|277 Main St, New ...|New York City|   NY|
| 295668|    27in FHD Monitor|              1|149.99|2019-12-22|410 6th St, San F...|San Francisco|   CA|
| 295669|USB-C Charging Cable|              1| 11.95|2019-12-18|43 Hill St, Atlan...|      Atlanta|   GA|
+-------+--------------------+---------------+------+----------+--------------------+-------------+-----+
only showing top 5 rows



### Adding the column Year and the column Month

In [13]:
sales_df = (
    sales_df.
    withColumn('Year', year('OrderDate')).
    withColumn('Month', month('OrderDate'))
)

### Dropping any row that contains null values

In [14]:
sales_df.count()

186305

In [15]:
sales_df = sales_df.dropna()

In [16]:
sales_df.count()

185950

### Writting the records in parquet file

In [17]:
(
    sales_df.
    write.
    mode('overwrite').
    partitionBy('Year', 'Month').
    parquet('data/sales_cleaned_and_transformed.parquet')
)