## Data preparation

Preparation of the raw sales data for smooth analysis.

- import the csv data
- clean the data
- export the prepared data in a Parquet format

#### **Import relevant Libraries**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
import warnings

warnings.filterwarnings('ignore')

#### **Create Session**

In [3]:
#create spark session
spark = (SparkSession.builder.appName('SalesAnalytics').getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/05 01:37:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/05 01:37:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/05 01:37:53 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


Java11 and Spark 3.2 when used together produce these errors. 
The first step is to switch to Java8 until this issue gets resolved, although spark session is still created and runs successfully

### Data Preparation and Cleansing

#### **Data Preparation**

In [4]:
#create schema
schema = StructType([
    StructField('Order ID', StringType(), True),
    StructField('Product', StringType(), True),
    StructField('Quantity Ordered', StringType(), True),
    StructField('Price Each', StringType(), True),
    StructField('Order Date', StringType(), True),
    StructField('Purchase Address', StringType(), True)
])

In [5]:
#read file from file path
sales_data_path = 'data/salesdata'


#create spark dataframe
sales_raw_df = (spark.read.format('csv')
                .option('header', True)
                .schema(schema)
                .load(sales_data_path))

In [6]:
sales_raw_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [7]:
sales_raw_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



#### **Remove Null Row and Bad Records**

In [8]:
from pyspark.sql.functions import col

In [9]:
#check for null values
sales_raw_df.filter(col('Order ID').isNull() == True).show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
+--------+-------+-------

In [10]:
#drop rows with null values
sales_raw_df = sales_raw_df.na.drop('any')

In [11]:
#check for null values
sales_raw_df.filter(col('Order ID').isNull() == True).show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [12]:
#check description
sales_raw_df.describe().show()



+-------+-----------------+------------+------------------+-----------------+--------------+--------------------+
|summary|         Order ID|     Product|  Quantity Ordered|       Price Each|    Order Date|    Purchase Address|
+-------+-----------------+------------+------------------+-----------------+--------------+--------------------+
|  count|           186305|      186305|            186305|           186305|        186305|              186305|
|   mean|230417.5693788653|        null|1.1243828986286637|184.3997347674861|          null|                null|
| stddev| 51512.7371099961|        null|0.4427926240286694|332.7313298843445|          null|                null|
|    min|           141234|20in Monitor|                 1|           109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|         Order ID|      iPhone|  Quantity Ordered|       Price Each|    Order Date|    Purchase Address|
+-------+-----------------+------------+------------------+-----------------+-----------

                                                                                

In [13]:
#filter for anomally
sales_raw_df.filter(col('Order ID') == 'Order ID').show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+-------

In [14]:
#select distinct records to reduce anomally
sales_temp_df = sales_raw_df.distinct()

In [15]:
#filter for anomally
sales_temp_df.filter(col('Order ID') == 'Order ID').show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+



In [16]:
#filter to exclude anomally from df
sales_temp_df = sales_temp_df.filter(col('Order ID') != 'Order ID')

In [17]:
#filter for anomally if available
sales_temp_df.filter(col('Order ID') == 'Order ID').show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [18]:
sales_temp_df.show(10, truncate = False)

+--------+--------------------------+----------------+----------+--------------+----------------------------------------+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                        |
+--------+--------------------------+----------------+----------+--------------+----------------------------------------+
|295900  |AA Batteries (4-pack)     |1               |3.84      |12/27/19 18:56|283 Washington St, Boston, MA 02215     |
|295923  |Lightning Charging Cable  |1               |14.95     |12/21/19 13:41|968 8th St, Austin, TX 73301            |
|295991  |Lightning Charging Cable  |1               |14.95     |12/15/19 20:16|857 Center St, Boston, MA 02215         |
|296076  |Macbook Pro Laptop        |1               |1700      |12/03/19 15:19|679 Chestnut St, San Francisco, CA 94016|
|297015  |AAA Batteries (4-pack)    |3               |2.99      |12/13/19 08:43|58 Dogwood St, San Francisco, CA 94016  |
|297237  |Bose SoundSpor

In [19]:
#description of df
sales_temp_df.describe().show()

+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|  count|            185686|      185686|             185686|            185686|        185686|              185686|
|   mean|230411.37622653297|        null| 1.1245435843305365|184.51925546358413|          null|                null|
| stddev| 51511.71718332079|        null|0.44306873838328764| 332.8438383900525|          null|                null|
|    min|            141234|20in Monitor|                  1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|            319670|      iPhone|                  9|            999.99|12/31/19 23:53|999 Wilson St, Sa...|
+-------+------------------+------------+-------------------+---

### **Extract city and State from Purchase Address**

In [20]:
from pyspark.sql.functions import split

In [21]:
sales_temp_df.select('Purchase Address').show(10, truncate = False)

+----------------------------------------+
|Purchase Address                        |
+----------------------------------------+
|283 Washington St, Boston, MA 02215     |
|968 8th St, Austin, TX 73301            |
|857 Center St, Boston, MA 02215         |
|679 Chestnut St, San Francisco, CA 94016|
|58 Dogwood St, San Francisco, CA 94016  |
|355 Park St, Boston, MA 02215           |
|542 9th St, New York City, NY 10001     |
|708 Walnut St, New York City, NY 10001  |
|538 Hickory St, San Francisco, CA 94016 |
|199 8th St, San Francisco, CA 94016     |
+----------------------------------------+
only showing top 10 rows



In [22]:
#split convert purchase address into a list for each row
sales_temp_df.select('Purchase Address', split(col('Purchase Address'), ',').alias('split')).show(10, False)

+----------------------------------------+--------------------------------------------+
|Purchase Address                        |split                                       |
+----------------------------------------+--------------------------------------------+
|283 Washington St, Boston, MA 02215     |[283 Washington St,  Boston,  MA 02215]     |
|968 8th St, Austin, TX 73301            |[968 8th St,  Austin,  TX 73301]            |
|857 Center St, Boston, MA 02215         |[857 Center St,  Boston,  MA 02215]         |
|679 Chestnut St, San Francisco, CA 94016|[679 Chestnut St,  San Francisco,  CA 94016]|
|58 Dogwood St, San Francisco, CA 94016  |[58 Dogwood St,  San Francisco,  CA 94016]  |
|355 Park St, Boston, MA 02215           |[355 Park St,  Boston,  MA 02215]           |
|542 9th St, New York City, NY 10001     |[542 9th St,  New York City,  NY 10001]     |
|708 Walnut St, New York City, NY 10001  |[708 Walnut St,  New York City,  NY 10001]  |
|538 Hickory St, San Francisco, 

In [23]:
#use getItem to index the list
sales_temp_df.select('Purchase Address', split(col('Purchase Address'), ',').getItem(1)).show(10, False)

+----------------------------------------+---------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)[1]|
+----------------------------------------+---------------------------------+
|283 Washington St, Boston, MA 02215     | Boston                          |
|968 8th St, Austin, TX 73301            | Austin                          |
|857 Center St, Boston, MA 02215         | Boston                          |
|679 Chestnut St, San Francisco, CA 94016| San Francisco                   |
|58 Dogwood St, San Francisco, CA 94016  | San Francisco                   |
|355 Park St, Boston, MA 02215           | Boston                          |
|542 9th St, New York City, NY 10001     | New York City                   |
|708 Walnut St, New York City, NY 10001  | New York City                   |
|538 Hickory St, San Francisco, CA 94016 | San Francisco                   |
|199 8th St, San Francisco, CA 94016     | San Francisco                   |

In [24]:
sales_temp_df.select('Purchase Address', split(col('Purchase Address'), ',').getItem(2)).show(10, False)

+----------------------------------------+---------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)[2]|
+----------------------------------------+---------------------------------+
|283 Washington St, Boston, MA 02215     | MA 02215                        |
|968 8th St, Austin, TX 73301            | TX 73301                        |
|857 Center St, Boston, MA 02215         | MA 02215                        |
|679 Chestnut St, San Francisco, CA 94016| CA 94016                        |
|58 Dogwood St, San Francisco, CA 94016  | CA 94016                        |
|355 Park St, Boston, MA 02215           | MA 02215                        |
|542 9th St, New York City, NY 10001     | NY 10001                        |
|708 Walnut St, New York City, NY 10001  | NY 10001                        |
|538 Hickory St, San Francisco, CA 94016 | CA 94016                        |
|199 8th St, San Francisco, CA 94016     | CA 94016                        |

In [25]:
sales_temp_df.select('Purchase Address', split(split(col('Purchase Address'), ',').getItem(2), ' ')).show(10, False)

+----------------------------------------+-----------------------------------------------+
|Purchase Address                        |split(split(Purchase Address, ,, -1)[2],  , -1)|
+----------------------------------------+-----------------------------------------------+
|283 Washington St, Boston, MA 02215     |[, MA, 02215]                                  |
|968 8th St, Austin, TX 73301            |[, TX, 73301]                                  |
|857 Center St, Boston, MA 02215         |[, MA, 02215]                                  |
|679 Chestnut St, San Francisco, CA 94016|[, CA, 94016]                                  |
|58 Dogwood St, San Francisco, CA 94016  |[, CA, 94016]                                  |
|355 Park St, Boston, MA 02215           |[, MA, 02215]                                  |
|542 9th St, New York City, NY 10001     |[, NY, 10001]                                  |
|708 Walnut St, New York City, NY 10001  |[, NY, 10001]                                  |

In [26]:
sales_temp_df = (sales_temp_df
                 .withColumn('City', split(col('Purchase Address'), ',').getItem(1))
                 .withColumn('State', split(split(col('Purchase Address'), ',').getItem(2), ' ').getItem(1)))

In [27]:
sales_temp_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+--------------+-----+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|          City|State|
+--------+--------------------+----------------+----------+--------------+--------------------+--------------+-----+
|  295900|AA Batteries (4-p...|               1|      3.84|12/27/19 18:56|283 Washington St...|        Boston|   MA|
|  295923|Lightning Chargin...|               1|     14.95|12/21/19 13:41|968 8th St, Austi...|        Austin|   TX|
|  295991|Lightning Chargin...|               1|     14.95|12/15/19 20:16|857 Center St, Bo...|        Boston|   MA|
|  296076|  Macbook Pro Laptop|               1|      1700|12/03/19 15:19|679 Chestnut St, ...| San Francisco|   CA|
|  297015|AAA Batteries (4-...|               3|      2.99|12/13/19 08:43|58 Dogwood St, Sa...| San Francisco|   CA|
|  297237|Bose SoundSport H...|               1|     99.99|12/16

### **Rename and Change Data Types**

In [28]:
from pyspark.sql.functions import to_timestamp, year, month
from pyspark.sql.types import IntegerType, FloatType

In [29]:
#create new columns and cast
#cast function to change datatype
#drop the old ones
sales_temp_df = (sales_temp_df.withColumn('OrderID', col('Order ID').cast(IntegerType()))
                 .withColumn('Quantity', col('Quantity Ordered').cast(IntegerType()))
                 .withColumn('Price', col('Price Each').cast(FloatType()))
                 .withColumn('OrderDate', to_timestamp(col('Order Date'),'MM/dd/yy HH:mm'))
                 .withColumnRenamed('Purchase Address', 'StoreAddress')
                 .drop('Order ID')
                 .drop('Quantity Ordered')
                 .drop('Price Each')
                 .drop('Purchase Address')
                )

In [30]:
sales_temp_df.show(10)

+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+
|             Product|    Order Date|        StoreAddress|          City|State|OrderID|Quantity| Price|          OrderDate|
+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+
|AA Batteries (4-p...|12/27/19 18:56|283 Washington St...|        Boston|   MA| 295900|       1|  3.84|2019-12-27 18:56:00|
|Lightning Chargin...|12/21/19 13:41|968 8th St, Austi...|        Austin|   TX| 295923|       1| 14.95|2019-12-21 13:41:00|
|Lightning Chargin...|12/15/19 20:16|857 Center St, Bo...|        Boston|   MA| 295991|       1| 14.95|2019-12-15 20:16:00|
|  Macbook Pro Laptop|12/03/19 15:19|679 Chestnut St, ...| San Francisco|   CA| 296076|       1|1700.0|2019-12-03 15:19:00|
|AAA Batteries (4-...|12/13/19 08:43|58 Dogwood St, Sa...| San Francisco|   CA| 297015|       3|  2.99|2019-12-13 08:43:00|
|Bose So

In [31]:
sales_temp_df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: timestamp (nullable = true)



#### **Add new columns, Month and Year**

In [32]:
#create new column year and month
sales_temp_df = (sales_temp_df.withColumn('ReportYear', year(col('OrderDate')))
                 .withColumn('Month', month(col('OrderDate')))
                )

In [33]:
sales_temp_df.show(5)

+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+----------+-----+
|             Product|    Order Date|        StoreAddress|          City|State|OrderID|Quantity| Price|          OrderDate|ReportYear|Month|
+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+----------+-----+
|AA Batteries (4-p...|12/27/19 18:56|283 Washington St...|        Boston|   MA| 295900|       1|  3.84|2019-12-27 18:56:00|      2019|   12|
|Lightning Chargin...|12/21/19 13:41|968 8th St, Austi...|        Austin|   TX| 295923|       1| 14.95|2019-12-21 13:41:00|      2019|   12|
|Lightning Chargin...|12/15/19 20:16|857 Center St, Bo...|        Boston|   MA| 295991|       1| 14.95|2019-12-15 20:16:00|      2019|   12|
|  Macbook Pro Laptop|12/03/19 15:19|679 Chestnut St, ...| San Francisco|   CA| 296076|       1|1700.0|2019-12-03 15:19:00|      2019|   12|
|AAA Batterie

### **Write df to Parquet**

In [34]:
#rearrange the columns 
sales_final_df = sales_temp_df.select('OrderID', 'Product', 'Quantity', 'Price', 'OrderDate', 'StoreAddress', 'City', 'State', 'ReportYear', 'Month')

In [35]:
sales_final_df.show(5)

+-------+--------------------+--------+------+-------------------+--------------------+--------------+-----+----------+-----+
|OrderID|             Product|Quantity| Price|          OrderDate|        StoreAddress|          City|State|ReportYear|Month|
+-------+--------------------+--------+------+-------------------+--------------------+--------------+-----+----------+-----+
| 295900|AA Batteries (4-p...|       1|  3.84|2019-12-27 18:56:00|283 Washington St...|        Boston|   MA|      2019|   12|
| 295923|Lightning Chargin...|       1| 14.95|2019-12-21 13:41:00|968 8th St, Austi...|        Austin|   TX|      2019|   12|
| 295991|Lightning Chargin...|       1| 14.95|2019-12-15 20:16:00|857 Center St, Bo...|        Boston|   MA|      2019|   12|
| 296076|  Macbook Pro Laptop|       1|1700.0|2019-12-03 15:19:00|679 Chestnut St, ...| San Francisco|   CA|      2019|   12|
| 297015|AAA Batteries (4-...|       3|  2.99|2019-12-13 08:43:00|58 Dogwood St, Sa...| San Francisco|   CA|      2019

In [36]:
#write dataframe to parquet and partition by year and month
output_path = 'data/output/sales'
sales_final_df.write.mode('overwrite').partitionBy('ReportYear', 'Month').parquet(output_path)

22/06/05 01:38:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/06/05 03:39:53 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 231269 ms exceeds timeout 120000 ms
22/06/05 03:39:53 WARN SparkContext: Killing executors is not supported by current scheduler.
