In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [0]:
display(dbutils.fs.ls('dbfs:/FileStore/tables/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/Sales_April_2019-1.csv,Sales_April_2019-1.csv,1595953,1720537125000
dbfs:/FileStore/tables/Sales_April_2019.csv,Sales_April_2019.csv,1595953,1720536860000
dbfs:/FileStore/tables/Sales_August_2019-1.csv,Sales_August_2019-1.csv,1043593,1720537125000
dbfs:/FileStore/tables/Sales_August_2019.csv,Sales_August_2019.csv,1043593,1720536860000
dbfs:/FileStore/tables/Sales_December_2019-1.csv,Sales_December_2019-1.csv,2181642,1720537126000
dbfs:/FileStore/tables/Sales_December_2019.csv,Sales_December_2019.csv,2181642,1720536861000
dbfs:/FileStore/tables/Sales_February_2019-1.csv,Sales_February_2019-1.csv,1046495,1720537126000
dbfs:/FileStore/tables/Sales_February_2019.csv,Sales_February_2019.csv,1046495,1720536861000
dbfs:/FileStore/tables/Sales_January_2019-1.csv,Sales_January_2019-1.csv,843098,1720537127000
dbfs:/FileStore/tables/Sales_January_2019.csv,Sales_January_2019.csv,843098,1720536862000


In [0]:
jan_df = spark.read.csv("/FileStore/tables/Sales_January_2019.csv", header=True)

In [0]:
# Merge 12 months data
sales_ori_df = spark.read.csv("/FileStore/tables/Sales_January_2019.csv", header=True)
month_list = ['February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
for month in month_list: 
    sales_ori_df = sales_ori_df.union(spark.read.csv(f"/FileStore/tables/Sales_{month}_2019.csv", header=True))


In [0]:
sales_ori_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



First convert data types, notice Order Date is in string type. We have to convert it into DateType. 
Check if there's any null values before conversion. 

The steps below are to use regex to strip off time `hh:ss` and then converting to date. These are just to experiment with RegEx. 
A quicker alternative would be to directly converting it to DateType in `mm-dd-yy hh:ss` and then extracting the relevant date components if needed in further processing 

In [0]:
sales_df = sales_ori_df.withColumnRenamed("Order Date", "Order Date Timestamp")

In [0]:
# check if there's any null values 
sales_df.where(F.col("Order Date Timestamp").isNull()).show()

+--------+-------+----------------+----------+--------------------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date Timestamp|Purchase Address|
+--------+-------+----------------+----------+--------------------+----------------+
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                null|            null|
|    null|   null|            null|      null|                nul

In [0]:
sales_df.filter(sales_df["Order Date Timestamp"].isNull() & sales_df["Order ID"].isNotNull()).show()

+--------+-------+----------------+----------+--------------------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date Timestamp|Purchase Address|
+--------+-------+----------------+----------+--------------------+----------------+
+--------+-------+----------------+----------+--------------------+----------------+



In [0]:
# Looks like if Order Date Timestamp is null, the order ID is also null, we can safely drop na rows  
sales_df = sales_df.na.drop()

In [0]:
sales_df.where(F.col("Order Date Timestamp").isNull()).show()

+--------+-------+----------------+----------+--------------------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date Timestamp|Purchase Address|
+--------+-------+----------------+----------+--------------------+----------------+
+--------+-------+----------------+----------+--------------------+----------------+



In [0]:
# Check if there's any Order Date not matching date pattern 

sales_df_odd = sales_df.filter(~F.col("Order Date Timestamp").rlike('\d{2}/\d{2}/\d{2}\s\d{2}:\d{2}'))
sales_df_odd.select("Order Date Timestamp").distinct().show()


+--------------------+
|Order Date Timestamp|
+--------------------+
|          Order Date|
+--------------------+



In [0]:
sales_df_odd.distinct().show()

+--------+-------+----------------+----------+--------------------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date Timestamp|Purchase Address|
+--------+-------+----------------+----------+--------------------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|          Order Date|Purchase Address|
+--------+-------+----------------+----------+--------------------+----------------+



There's only one pattern where the Order Date Timestamp doesn't match the timedate pattern. We can safely remove these rows. 

In [0]:
sales_df = sales_df.filter(F.col("Order Date Timestamp").rlike('\d{2}/\d{2}/\d{2}\s\d{2}:\d{2}'))

In [0]:
sales_df = sales_df.withColumn("Order Date", F.regexp_extract(sales_df['Order Date Timestamp'], r'(\d{2}/\d{2}/\d{2})\s\d{2}:\d{2}', 1))

In [0]:
sales_df.show(5)

+--------+--------------------+----------------+----------+--------------------+--------------------+----------+
|Order ID|             Product|Quantity Ordered|Price Each|Order Date Timestamp|    Purchase Address|Order Date|
+--------+--------------------+----------------+----------+--------------------+--------------------+----------+
|  141234|              iPhone|               1|       700|      01/22/19 21:25|944 Walnut St, Bo...|  01/22/19|
|  141235|Lightning Chargin...|               1|     14.95|      01/28/19 14:15|185 Maple St, Por...|  01/28/19|
|  141236|    Wired Headphones|               2|     11.99|      01/17/19 13:33|538 Adams St, San...|  01/17/19|
|  141237|    27in FHD Monitor|               1|    149.99|      01/05/19 20:33|738 10th St, Los ...|  01/05/19|
|  141238|    Wired Headphones|               1|     11.99|      01/25/19 11:59|387 10th St, Aust...|  01/25/19|
+--------+--------------------+----------------+----------+--------------------+----------------

In [0]:
sales_df = sales_df.withColumn(
    "Order Date",
    F.when(F.col("Order Date").rlike("\d{2}/\d{2}/\d{2}"), F.to_date("Order Date", "MM/dd/yy"))
     .otherwise(None)
)
sales_df.show(5)

+--------+--------------------+----------------+----------+--------------------+--------------------+----------+
|Order ID|             Product|Quantity Ordered|Price Each|Order Date Timestamp|    Purchase Address|Order Date|
+--------+--------------------+----------------+----------+--------------------+--------------------+----------+
|  141234|              iPhone|               1|       700|      01/22/19 21:25|944 Walnut St, Bo...|2019-01-22|
|  141235|Lightning Chargin...|               1|     14.95|      01/28/19 14:15|185 Maple St, Por...|2019-01-28|
|  141236|    Wired Headphones|               2|     11.99|      01/17/19 13:33|538 Adams St, San...|2019-01-17|
|  141237|    27in FHD Monitor|               1|    149.99|      01/05/19 20:33|738 10th St, Los ...|2019-01-05|
|  141238|    Wired Headphones|               1|     11.99|      01/25/19 11:59|387 10th St, Aust...|2019-01-25|
+--------+--------------------+----------------+----------+--------------------+----------------

In [0]:
# convert Quantity Ordered to integer, and Price Each to double
sales_df = sales_df.withColumn('Quantity Ordered', sales_df['Quantity Ordered'].cast(IntegerType()))\
                .withColumn('Price Each', sales_df['Price Each'].cast(DoubleType()))

In [0]:
sales_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date Timestamp: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Order Date: date (nullable = true)



In [0]:
sales_df = sales_df.withColumn("Month", F.month("Order Date"))

In [0]:
sales_df.show(5)

+--------+--------------------+----------------+----------+--------------------+--------------------+----------+-----+
|Order ID|             Product|Quantity Ordered|Price Each|Order Date Timestamp|    Purchase Address|Order Date|Month|
+--------+--------------------+----------------+----------+--------------------+--------------------+----------+-----+
|  141234|              iPhone|               1|     700.0|      01/22/19 21:25|944 Walnut St, Bo...|2019-01-22|    1|
|  141235|Lightning Chargin...|               1|     14.95|      01/28/19 14:15|185 Maple St, Por...|2019-01-28|    1|
|  141236|    Wired Headphones|               2|     11.99|      01/17/19 13:33|538 Adams St, San...|2019-01-17|    1|
|  141237|    27in FHD Monitor|               1|    149.99|      01/05/19 20:33|738 10th St, Los ...|2019-01-05|    1|
|  141238|    Wired Headphones|               1|     11.99|      01/25/19 11:59|387 10th St, Aust...|2019-01-25|    1|
+--------+--------------------+----------------+

In [0]:
sales_df.select("Month").distinct().show()

+-----+
|Month|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
|   11|
|   12|
+-----+



In [0]:
sales_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date Timestamp: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Month: integer (nullable = true)



We'll plot the amount of sales against month.

In [0]:
sales_df = sales_df.withColumn("Sales Amount", F.col("Quantity Ordered")*F.col("Price Each"))
sales_df.show(5)

+--------+--------------------+----------------+----------+--------------------+--------------------+----------+-----+------------+
|Order ID|             Product|Quantity Ordered|Price Each|Order Date Timestamp|    Purchase Address|Order Date|Month|Sales Amount|
+--------+--------------------+----------------+----------+--------------------+--------------------+----------+-----+------------+
|  141234|              iPhone|               1|     700.0|      01/22/19 21:25|944 Walnut St, Bo...|2019-01-22|    1|       700.0|
|  141235|Lightning Chargin...|               1|     14.95|      01/28/19 14:15|185 Maple St, Por...|2019-01-28|    1|       14.95|
|  141236|    Wired Headphones|               2|     11.99|      01/17/19 13:33|538 Adams St, San...|2019-01-17|    1|       23.98|
|  141237|    27in FHD Monitor|               1|    149.99|      01/05/19 20:33|738 10th St, Los ...|2019-01-05|    1|      149.99|
|  141238|    Wired Headphones|               1|     11.99|      01/25/19 11

In [0]:
non_double_rows = sales_df.filter(F.col("Sales Amount").cast("double").isNull())
non_double_rows.show(5)

+--------+-------+----------------+----------+--------------------+----------------+----------+-----+------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date Timestamp|Purchase Address|Order Date|Month|Sales Amount|
+--------+-------+----------------+----------+--------------------+----------------+----------+-----+------------+
+--------+-------+----------------+----------+--------------------+----------------+----------+-----+------------+



In [0]:
sales_month_df = sales_df.select("Month", "Sales Amount").groupBy("Month").agg(F.sum("Sales Amount").alias("Monthly Sales"))
#.groupBy("Month").agg(sum("Sales Amount"))
sales_month_df.show()

+-----+------------------+
|Month|     Monthly Sales|
+-----+------------------+
|    1|1822256.7299999138|
|    2| 2202022.419999963|
|    3|2807100.3800003603|
|    4|3390670.2400007015|
|    5| 3152606.750000546|
|    6|2577802.2600001753|
|    7|2647775.7600002354|
|    8| 2244467.879999992|
|    9| 2097560.129999891|
|   10|3736726.8800009675|
|   11|3199603.2000005865|
|   12|  4613443.34000153|
+-----+------------------+



In [0]:
import pandas as pd
import datetime
import matplotlib.pyplot as plt

df = sales_month_df.toPandas()

df['Month'] = df['Month'].apply(lambda x: datetime.datetime.strptime(str(x), "%m").strftime("%B"))

print(type(df))
plt.figure(figsize=(24, 8))

df.plot('Month', 'Monthly Sales',kind='bar', legend=False)

plt.xticks(rotation=45)

plt.ylabel('Monthly Sales in USD($)')

plt.title('Sales Amount by Month')

plt.show()




<class 'pandas.core.frame.DataFrame'>
