In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

In [0]:
spark = (SparkSession.builder.appName("SalesAnalytics").getOrCreate())

## Data preparation

In [0]:
schema = StructType([
    StructField("Order ID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity Ordered", StringType(), True),
    StructField("Price Each", StringType(), True),
    StructField("Order Date", StringType(), True),
    StructField("Purchase Address", StringType(), True),
])

In [0]:
sales_data_path = "dbfs:/FileStore/shared_uploads/htb29021996@gmail.com/salesdata/"
sales_raw_df = (spark.read.format("csv")
                .option("header", True)
                .schema(schema)
                .load(sales_data_path)
                )

In [0]:
sales_raw_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [0]:
sales_raw_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



## Data Preparation and Cleansing

### 1. Remove Null Rows and Bad Records

In [0]:
from pyspark.sql.functions import col

In [0]:
sales_raw_df.filter(col("Order ID").isNull() == True).na.drop().show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [0]:
sales_raw_df.describe("Order ID", "Product", "Quantity Ordered", "Price Each", "Order Date", "Purchase Address").show()

+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            186305|      186305|            186305|            186305|        186305|              186305|
|   mean| 230417.5693788653|        NULL|1.1243828986286637|184.39973476747707|          NULL|                NULL|
| stddev|51512.737109995265|        NULL|0.4427926240286704| 332.7313298843439|          NULL|                NULL|
|    min|            141234|20in Monitor|                 1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|          Order ID|      iPhone|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------

In [0]:
sales_raw_df.filter(col("Order ID") == "Order ID").show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+-------

In [0]:
sales_temp_df = sales_raw_df.filter(col("Order ID") != "Order ID")

In [0]:
sales_temp_df.describe("Order ID", "Product", "Quantity Ordered", "Price Each", "Order Date", "Purchase Address").show()

+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            185950|      185950|            185950|            185950|        185950|              185950|
|   mean| 230417.5693788653|        NULL|1.1243828986286637|184.39973476747707|          NULL|                NULL|
| stddev|51512.737109995265|        NULL|0.4427926240286704| 332.7313298843439|          NULL|                NULL|
|    min|            141234|20in Monitor|                 1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|            319670|      iPhone|                 9|            999.99|12/31/19 23:53|999 Wilson St, Sa...|
+-------+------------------+------------+------------------+------------

### 2. Extract the city and State frrom Purchase Address

In [0]:
from pyspark.sql.functions import split

In [0]:
sales_temp_df.select("Purchase Address").show(10, False)

+-----------------------------------------+
|Purchase Address                         |
+-----------------------------------------+
|136 Church St, New York City, NY 10001   |
|562 2nd St, New York City, NY 10001      |
|277 Main St, New York City, NY 10001     |
|410 6th St, San Francisco, CA 94016      |
|43 Hill St, Atlanta, GA 30301            |
|200 Jefferson St, New York City, NY 10001|
|928 12th St, Portland, OR 97035          |
|813 Hickory St, Dallas, TX 75001         |
|718 Wilson St, Dallas, TX 75001          |
|77 7th St, Dallas, TX 75001              |
+-----------------------------------------+
only showing top 10 rows



In [0]:
sales_temp_df.select("Purchase Address", split(col("Purchase Address"), ",").alias("City")).show(10, False)

+-----------------------------------------+---------------------------------------------+
|Purchase Address                         |City                                         |
+-----------------------------------------+---------------------------------------------+
|136 Church St, New York City, NY 10001   |[136 Church St,  New York City,  NY 10001]   |
|562 2nd St, New York City, NY 10001      |[562 2nd St,  New York City,  NY 10001]      |
|277 Main St, New York City, NY 10001     |[277 Main St,  New York City,  NY 10001]     |
|410 6th St, San Francisco, CA 94016      |[410 6th St,  San Francisco,  CA 94016]      |
|43 Hill St, Atlanta, GA 30301            |[43 Hill St,  Atlanta,  GA 30301]            |
|200 Jefferson St, New York City, NY 10001|[200 Jefferson St,  New York City,  NY 10001]|
|928 12th St, Portland, OR 97035          |[928 12th St,  Portland,  OR 97035]          |
|813 Hickory St, Dallas, TX 75001         |[813 Hickory St,  Dallas,  TX 75001]         |
|718 Wilso

In [0]:
sales_temp_df.select("Purchase Address", split(col("Purchase Address"), ",").getItem(1).alias("City")).show(10, False)

+-----------------------------------------+--------------+
|Purchase Address                         |City          |
+-----------------------------------------+--------------+
|136 Church St, New York City, NY 10001   | New York City|
|562 2nd St, New York City, NY 10001      | New York City|
|277 Main St, New York City, NY 10001     | New York City|
|410 6th St, San Francisco, CA 94016      | San Francisco|
|43 Hill St, Atlanta, GA 30301            | Atlanta      |
|200 Jefferson St, New York City, NY 10001| New York City|
|928 12th St, Portland, OR 97035          | Portland     |
|813 Hickory St, Dallas, TX 75001         | Dallas       |
|718 Wilson St, Dallas, TX 75001          | Dallas       |
|77 7th St, Dallas, TX 75001              | Dallas       |
+-----------------------------------------+--------------+
only showing top 10 rows



In [0]:
sales_temp_df.select("Purchase Address", split(col("Purchase Address"), ",").getItem(2).alias("City Code")).show(10, False)

+-----------------------------------------+---------+
|Purchase Address                         |City Code|
+-----------------------------------------+---------+
|136 Church St, New York City, NY 10001   | NY 10001|
|562 2nd St, New York City, NY 10001      | NY 10001|
|277 Main St, New York City, NY 10001     | NY 10001|
|410 6th St, San Francisco, CA 94016      | CA 94016|
|43 Hill St, Atlanta, GA 30301            | GA 30301|
|200 Jefferson St, New York City, NY 10001| NY 10001|
|928 12th St, Portland, OR 97035          | OR 97035|
|813 Hickory St, Dallas, TX 75001         | TX 75001|
|718 Wilson St, Dallas, TX 75001          | TX 75001|
|77 7th St, Dallas, TX 75001              | TX 75001|
+-----------------------------------------+---------+
only showing top 10 rows



In [0]:
sales_temp_df = (sales_temp_df.withColumn("City", split(col("Purchase Address"), ",").getItem(1))
                              .withColumn("State", split(split(col("Purchase Address"), ",").getItem(2), ' ').getItem(1)))

In [0]:
sales_temp_df.show(10, False)

+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+--------------+-----+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                         |City          |State|
+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+--------------+-----+
|295665  |Macbook Pro Laptop        |1               |1700      |12/30/19 00:01|136 Church St, New York City, NY 10001   | New York City|NY   |
|295666  |LG Washing Machine        |1               |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001      | New York City|NY   |
|295667  |USB-C Charging Cable      |1               |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001     | New York City|NY   |
|295668  |27in FHD Monitor          |1               |149.99    |12/22/19 15:13|410 6th St, San Francisco, CA 94016      | San Francisco

### 3. Rename and Change DataTypes

In [0]:
from pyspark.sql.functions import to_timestamp, year, month
from pyspark.sql.types import IntegerType, FloatType

In [0]:
sales_temp_df = (sales_temp_df.withColumn("OrderID", col("Order ID").cast(IntegerType()))
                                .withColumn("Quantity", col("Quantity Ordered").cast(IntegerType()))
                                .withColumn("Price", col("Price Each").cast(FloatType()))
                                .withColumn("OrderDate", to_timestamp(col("Order Date"),"MM/dd/yy HH:mm"))
                                .withColumnRenamed("Purchase Address", "StoreAddress")
                                .drop("Order ID")
                                .drop("Quantity Ordered")
                                .drop("Price Each")
                                .drop("Order Date"))

In [0]:
sales_temp_df.show(10, False)

+--------------------------+-----------------------------------------+--------------+-----+-------+--------+------+-------------------+
|Product                   |StoreAddress                             |City          |State|OrderID|Quantity|Price |OrderDate          |
+--------------------------+-----------------------------------------+--------------+-----+-------+--------+------+-------------------+
|Macbook Pro Laptop        |136 Church St, New York City, NY 10001   | New York City|NY   |295665 |1       |1700.0|2019-12-30 00:01:00|
|LG Washing Machine        |562 2nd St, New York City, NY 10001      | New York City|NY   |295666 |1       |600.0 |2019-12-29 07:03:00|
|USB-C Charging Cable      |277 Main St, New York City, NY 10001     | New York City|NY   |295667 |1       |11.95 |2019-12-12 18:21:00|
|27in FHD Monitor          |410 6th St, San Francisco, CA 94016      | San Francisco|CA   |295668 |1       |149.99|2019-12-22 15:13:00|
|USB-C Charging Cable      |43 Hill St, Atlanta,

### 4. Add New Column: Month and Year

In [0]:
sales_temp_df = (sales_temp_df.withColumn("ReportedYear", year(col("OrderDate")))
                                .withColumn("Month", month(col("OrderDate"))))

In [0]:
sales_temp_df.show(10, False)

+--------------------------+-----------------------------------------+--------------+-----+-------+--------+------+-------------------+------------+-----+
|Product                   |StoreAddress                             |City          |State|OrderID|Quantity|Price |OrderDate          |ReportedYear|Month|
+--------------------------+-----------------------------------------+--------------+-----+-------+--------+------+-------------------+------------+-----+
|Macbook Pro Laptop        |136 Church St, New York City, NY 10001   | New York City|NY   |295665 |1       |1700.0|2019-12-30 00:01:00|2019        |12   |
|LG Washing Machine        |562 2nd St, New York City, NY 10001      | New York City|NY   |295666 |1       |600.0 |2019-12-29 07:03:00|2019        |12   |
|USB-C Charging Cable      |277 Main St, New York City, NY 10001     | New York City|NY   |295667 |1       |11.95 |2019-12-12 18:21:00|2019        |12   |
|27in FHD Monitor          |410 6th St, San Francisco, CA 94016      |

### 5. Write Final DataFrame to Parquet

In [0]:
sales_final_df = sales_temp_df.select("OrderID", "Product", "Quantity", "Price", "OrderDate", "StoreAddress", "City", "ReportedYear", "Month")

In [0]:
sales_final_df.show(10, False)

+-------+--------------------------+--------+------+-------------------+-----------------------------------------+--------------+------------+-----+
|OrderID|Product                   |Quantity|Price |OrderDate          |StoreAddress                             |City          |ReportedYear|Month|
+-------+--------------------------+--------+------+-------------------+-----------------------------------------+--------------+------------+-----+
|295665 |Macbook Pro Laptop        |1       |1700.0|2019-12-30 00:01:00|136 Church St, New York City, NY 10001   | New York City|2019        |12   |
|295666 |LG Washing Machine        |1       |600.0 |2019-12-29 07:03:00|562 2nd St, New York City, NY 10001      | New York City|2019        |12   |
|295667 |USB-C Charging Cable      |1       |11.95 |2019-12-12 18:21:00|277 Main St, New York City, NY 10001     | New York City|2019        |12   |
|295668 |27in FHD Monitor          |1       |149.99|2019-12-22 15:13:00|410 6th St, San Francisco, CA 9401

In [0]:
output_path = "dbfs:/FileStore/shared_uploads/htb29021996@gmail.com/salesdata/ParquetFile/"
sales_final_df.write.mode("overwrite").partitionBy("ReportedYear", "Month").parquet(output_path)