In [0]:
# load dataset from Mounted Blob Storage

df = spark.read.format("csv")\
    .option("inferSchema", True) \
    .option("header", True) \
    .option("sep", ",") \
    .load("/mnt/TestMountBlobPython/sales_data.csv")

display(df)

_c0,Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address,Month,Sales,City,Hour
0,295665,Macbook Pro Laptop,1,1700.0,2019-12-30T00:01:00.000+0000,"136 Church St, New York City, NY 10001",12,1700.0,New York City,0
1,295666,LG Washing Machine,1,600.0,2019-12-29T07:03:00.000+0000,"562 2nd St, New York City, NY 10001",12,600.0,New York City,7
2,295667,USB-C Charging Cable,1,11.95,2019-12-12T18:21:00.000+0000,"277 Main St, New York City, NY 10001",12,11.95,New York City,18
3,295668,27in FHD Monitor,1,149.99,2019-12-22T15:13:00.000+0000,"410 6th St, San Francisco, CA 94016",12,149.99,San Francisco,15
4,295669,USB-C Charging Cable,1,11.95,2019-12-18T12:38:00.000+0000,"43 Hill St, Atlanta, GA 30301",12,11.95,Atlanta,12
5,295670,AA Batteries (4-pack),1,3.84,2019-12-31T22:58:00.000+0000,"200 Jefferson St, New York City, NY 10001",12,3.84,New York City,22
6,295671,USB-C Charging Cable,1,11.95,2019-12-16T15:10:00.000+0000,"928 12th St, Portland, OR 97035",12,11.95,Portland,15
7,295672,USB-C Charging Cable,2,11.95,2019-12-13T09:29:00.000+0000,"813 Hickory St, Dallas, TX 75001",12,23.9,Dallas,9
8,295673,Bose SoundSport Headphones,1,99.99,2019-12-15T23:26:00.000+0000,"718 Wilson St, Dallas, TX 75001",12,99.99,Dallas,23
9,295674,AAA Batteries (4-pack),4,2.99,2019-12-28T11:51:00.000+0000,"77 7th St, Dallas, TX 75001",12,11.96,Dallas,11


In [0]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: timestamp (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Sales: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Hour: integer (nullable = true)



In [0]:
# Get size of dataframe

print("{} rows".format(df.count()))
print("{} columns".format(len(df.columns)))

185950 rows
11 columns


In [0]:
# Rename columns

df = df.withColumnRenamed("Quantity Ordered", "Quantity_Ordered")
df = df.withColumnRenamed("Price Each", "Price")
df = df.withColumnRenamed('Order Date', 'Order_date')
df = df.withColumnRenamed('Purchase Address', 'Purchase_address')
df = df.withColumnRenamed('Order ID', 'Order_ID')

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Order_ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity_Ordered: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Order_date: timestamp (nullable = true)
 |-- Purchase_address: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Sales: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Hour: integer (nullable = true)



In [0]:
# Drop Duplicates

df.dropDuplicates(['Order_ID'])

Out[58]: DataFrame[_c0: int, Order_ID: int, Product: string, Quantity_Ordered: int, Price: double, Order_date: timestamp, Purchase_address: string, Month: int, Sales: double, City: string, Hour: int]

In [0]:
# Chnage values in month column to reflect actual month names

from pyspark.sql.functions import col, when

df = df.withColumn(
    "Monthnew",
    when(col("Month") == 12, "Dec")
    .when(col("Month") == 11, "Nov")
    .when(col("Month") == 10, "Oct")
    .when(col("Month") == 9, "Sep")
    .when(col("Month") == 8, "Aug")
    .when(col("Month") == 7, "Jul")
    .when(col("Month") == 6, "Jun")
    .when(col("Month") == 5, "May")
    .when(col("Month") == 4, "Apr")
    .when(col("Month") == 3, "Mar")
    .when(col("Month") == 2, "Feb")
    .when(col("Month") == 1, "Jan"),
)

display(df)

_c0,Order_ID,Product,Quantity_Ordered,Price,Order_date,Purchase_address,Month,Sales,City,Hour,Monthnew
0,295665,Macbook Pro Laptop,1,1700.0,2019-12-30T00:01:00.000+0000,"136 Church St, New York City, NY 10001",12,1700.0,New York City,0,Dec
1,295666,LG Washing Machine,1,600.0,2019-12-29T07:03:00.000+0000,"562 2nd St, New York City, NY 10001",12,600.0,New York City,7,Dec
2,295667,USB-C Charging Cable,1,11.95,2019-12-12T18:21:00.000+0000,"277 Main St, New York City, NY 10001",12,11.95,New York City,18,Dec
3,295668,27in FHD Monitor,1,149.99,2019-12-22T15:13:00.000+0000,"410 6th St, San Francisco, CA 94016",12,149.99,San Francisco,15,Dec
4,295669,USB-C Charging Cable,1,11.95,2019-12-18T12:38:00.000+0000,"43 Hill St, Atlanta, GA 30301",12,11.95,Atlanta,12,Dec
5,295670,AA Batteries (4-pack),1,3.84,2019-12-31T22:58:00.000+0000,"200 Jefferson St, New York City, NY 10001",12,3.84,New York City,22,Dec
6,295671,USB-C Charging Cable,1,11.95,2019-12-16T15:10:00.000+0000,"928 12th St, Portland, OR 97035",12,11.95,Portland,15,Dec
7,295672,USB-C Charging Cable,2,11.95,2019-12-13T09:29:00.000+0000,"813 Hickory St, Dallas, TX 75001",12,23.9,Dallas,9,Dec
8,295673,Bose SoundSport Headphones,1,99.99,2019-12-15T23:26:00.000+0000,"718 Wilson St, Dallas, TX 75001",12,99.99,Dallas,23,Dec
9,295674,AAA Batteries (4-pack),4,2.99,2019-12-28T11:51:00.000+0000,"77 7th St, Dallas, TX 75001",12,11.96,Dallas,11,Dec


In [0]:
# drop unnecessary columns

df = df.drop('_c0','Order_ID','Month')

df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- Quantity_Ordered: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Order_date: timestamp (nullable = true)
 |-- Purchase_address: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Monthnew: string (nullable = true)



In [0]:
# Rename 'Monthnew' column

df = df.withColumnRenamed("Monthnew", "Month")

df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- Quantity_Ordered: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Order_date: timestamp (nullable = true)
 |-- Purchase_address: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Month: string (nullable = true)



In [0]:
df.summary().show()

+-------+------------+-------------------+------------------+--------------------+------------------+--------+------------------+------+
|summary|     Product|   Quantity_Ordered|             Price|    Purchase_address|             Sales|    City|              Hour| Month|
+-------+------------+-------------------+------------------+--------------------+------------------+--------+------------------+------+
|  count|      185950|             185950|            185950|              185950|            185950|  185950|            185950|185950|
|   mean|        null| 1.1243828986286637|184.39973476746866|                null|185.49091675187515|    null|14.413304651788115|  null|
| stddev|        null|0.44279262402867087|332.73132988434406|                null| 332.9197713864784|    null| 5.423415962073379|  null|
|    min|20in Monitor|                  1|              2.99|1 11th St, Atlant...|              2.99| Atlanta|                 0|   Apr|
|    25%|        null|                  1

In [0]:
# Create a view or table

temp_table_name = 'Sales_csv'

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

Select *
FROM Sales_csv

Product,Quantity_Ordered,Price,Order_date,Purchase_address,Sales,City,Hour,Month
Macbook Pro Laptop,1,1700.0,2019-12-30T00:01:00.000+0000,"136 Church St, New York City, NY 10001",1700.0,New York City,0,Dec
LG Washing Machine,1,600.0,2019-12-29T07:03:00.000+0000,"562 2nd St, New York City, NY 10001",600.0,New York City,7,Dec
USB-C Charging Cable,1,11.95,2019-12-12T18:21:00.000+0000,"277 Main St, New York City, NY 10001",11.95,New York City,18,Dec
27in FHD Monitor,1,149.99,2019-12-22T15:13:00.000+0000,"410 6th St, San Francisco, CA 94016",149.99,San Francisco,15,Dec
USB-C Charging Cable,1,11.95,2019-12-18T12:38:00.000+0000,"43 Hill St, Atlanta, GA 30301",11.95,Atlanta,12,Dec
AA Batteries (4-pack),1,3.84,2019-12-31T22:58:00.000+0000,"200 Jefferson St, New York City, NY 10001",3.84,New York City,22,Dec
USB-C Charging Cable,1,11.95,2019-12-16T15:10:00.000+0000,"928 12th St, Portland, OR 97035",11.95,Portland,15,Dec
USB-C Charging Cable,2,11.95,2019-12-13T09:29:00.000+0000,"813 Hickory St, Dallas, TX 75001",23.9,Dallas,9,Dec
Bose SoundSport Headphones,1,99.99,2019-12-15T23:26:00.000+0000,"718 Wilson St, Dallas, TX 75001",99.99,Dallas,23,Dec
AAA Batteries (4-pack),4,2.99,2019-12-28T11:51:00.000+0000,"77 7th St, Dallas, TX 75001",11.96,Dallas,11,Dec


In [0]:
%sql

SELECT Product, sum(Sales) as Total_Sales 
FROM Sales_csv
GROUP BY Product
ORDER BY Total_Sales desc


Product,Total_Sales
Macbook Pro Laptop,8037600.0
iPhone,4794300.0
ThinkPad Laptop,4129958.699999968
Google Phone,3319200.0
27in 4K Gaming Monitor,2435097.5599999577
34in Ultrawide Monitor,2355558.0099999583
Apple Airpods Headphones,2349150.0
Flatscreen TV,1445700.0
Bose SoundSport Headphones,1345565.429999936
27in FHD Monitor,1132424.49999997


In [0]:
display(_sqldf)

Product,Total_Sales
Macbook Pro Laptop,8037600.0
iPhone,4794300.0
ThinkPad Laptop,4129958.699999968
Google Phone,3319200.0
27in 4K Gaming Monitor,2435097.5599999577
34in Ultrawide Monitor,2355558.0099999583
Apple Airpods Headphones,2349150.0
Flatscreen TV,1445700.0
Bose SoundSport Headphones,1345565.429999936
27in FHD Monitor,1132424.49999997


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Sales_csv"

df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
# write output to datalake Gen2 Mount Point

_sqldf.write.format("com.databricks.spark.csv").option("header","true").option("delimiter",",").mode("overwrite").save("/mnt/TestMountBlobPython/Sales_data")