In [48]:
#Big data analytics using  Pyspark on a Real world sales data by Anduamlak_Y.

#Tasks
    # 1. What is the best month for sales? How much it  was earned ?
    # 2. Which city had the highest number of sales?
    # 3. What time should we display advertisment to maximize the liklihood of customer for buying products?
    # 4. Segregate/Classify all the products with expensive product,mid range product and  cheapest ones?
    # 5. How many times each product purchased? Which product is the top purchased one?
    # 6. Top fiver ordered Products?

In [49]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [50]:
#Creating spark session and defining the app name
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Pyspark_Project_Sales_Data').getOrCreate()

In [51]:
spark

In [52]:
df_pyspark=spark.read.csv('allsalesdata.csv',header=True,inferSchema=True)
df_pyspark.show(5)

+---+--------+--------------------+----------------+----------+--------------+--------------------+
|_c0|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+---+--------+--------------------+----------------+----------+--------------+--------------------+
|  0|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|  1|    NULL|                NULL|            NULL|      NULL|          NULL|                NULL|
|  2|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  3|  176560|        Google Phone|               1|       600|04/12/19 14:38|669 Spruce St, Lo...|
|  4|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
+---+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [53]:
#Data Preprocessing

In [54]:
#Total number of rows and columns
print('Rows: ', df_pyspark.count())
print('Columns:', len(df_pyspark.columns))

Rows:  186850
Columns: 7


In [55]:
#Dropping the colum _c0
df_pyspark=df_pyspark.drop('_c0')
df_pyspark.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|    NULL|                NULL|            NULL|      NULL|          NULL|                NULL|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|       600|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [56]:
#If you want you can  fill null values with missing_value or any other strategy you want (mean,mode...)
# df_pyspark.na.fill("Missing_Value").show(3)

In [57]:
df_pyspark.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



In [58]:
#Handling null values 
#If 'any', drop a row if it contains any nulls.
#If 'all', drop a row only if all its values are null.
df_pyspark=df_pyspark.na.drop(how='all')
df_pyspark.count()

186305

In [59]:
#sales dataframe after removing null values 
df_pyspark.show(3)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|       600|04/12/19 14:38|669 Spruce St, Lo...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 3 rows



In [60]:
#Unique Values in 'Product' Column
df_pyspark.select('Product').distinct().collect()

[Row(Product='Wired Headphones'),
 Row(Product='Macbook Pro Laptop'),
 Row(Product='Apple Airpods Headphones'),
 Row(Product='iPhone'),
 Row(Product='Lightning Charging Cable'),
 Row(Product='Bose SoundSport Headphones'),
 Row(Product='USB-C Charging Cable'),
 Row(Product='AAA Batteries (4-pack)'),
 Row(Product='20in Monitor'),
 Row(Product='27in FHD Monitor'),
 Row(Product='Vareebadd Phone'),
 Row(Product='34in Ultrawide Monitor'),
 Row(Product='LG Dryer'),
 Row(Product='AA Batteries (4-pack)'),
 Row(Product='Google Phone'),
 Row(Product='Flatscreen TV'),
 Row(Product='LG Washing Machine'),
 Row(Product='Product'),
 Row(Product='27in 4K Gaming Monitor'),
 Row(Product='ThinkPad Laptop')]

In [61]:
#Renaming columns and store it in new sales_df
sales_df = df_pyspark.withColumnRenamed("Order ID","Order_ID") \
            .withColumnRenamed("Quantity Ordered","Quantity_Ordered") \
            .withColumnRenamed("Price Each","Price_Each") \
            .withColumnRenamed("Order Date","Order_Date") \
            .withColumnRenamed("Purchase Address","Purchase_Address")
sales_df.show(3)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|    Order_Date|    Purchase_Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|       600|04/12/19 14:38|669 Spruce St, Lo...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 3 rows



In [62]:
sales_df=sales_df[~(sales_df['Order_Date']=='Order Date')]

In [63]:
#Changed the datatype to timestamp for order date  to Derive sales_month , sales_year and sales_date
from pyspark.sql import functions as F
sales_df = sales_df.withColumn('Order_Date', F.to_timestamp('Order_Date', 'MM/dd/yy HH:mm'))
sales_df.show(2)

+--------+--------------------+----------------+----------+-------------------+--------------------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|
+--------+--------------------+----------------+----------+-------------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|
+--------+--------------------+----------------+----------+-------------------+--------------------+
only showing top 2 rows



In [64]:
#Casting Quantity_Ordered and Price_Each columns to Int and Float
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType,FloatType
sales_df = sales_df.withColumn("Quantity_Ordered",col("Quantity_Ordered").cast(IntegerType()))  \
           .withColumn("Price_Each",col("Price_Each").cast(FloatType()))
sales_df.printSchema()

root
 |-- Order_ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity_Ordered: integer (nullable = true)
 |-- Price_Each: float (nullable = true)
 |-- Order_Date: timestamp (nullable = true)
 |-- Purchase_Address: string (nullable = true)



In [65]:
#Deriving new column sales_year,sales_month,sales_day 
from pyspark.sql.functions import month,year,day
sales_df=sales_df.withColumn('sales_Year',year(sales_df.Order_Date)) \
                .withColumn('sales_Month',month(sales_df.Order_Date)) \
                 .withColumn('sales_Day',day(sales_df.Order_Date)) 
sales_df.show(3)

+--------+--------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|sales_Year|sales_Month|sales_Day|
+--------+--------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|      2019|          4|       19|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|      2019|          4|        7|
|  176560|        Google Phone|               1|     600.0|2019-04-12 14:38:00|669 Spruce St, Lo...|      2019|          4|       12|
+--------+--------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+
only showing top 3 rows



In [66]:
#New column Deriving Total Sales =Quantity_Ordered * Price_Each
#Adding False flag to see column value details well
sales_df=sales_df.withColumn('Total_Sales',(sales_df.Quantity_Ordered) * (sales_df.Price_Each))
sales_df.show(5,False)

+--------+--------------------------+----------------+----------+-------------------+------------------------------------+----------+-----------+---------+-----------+
|Order_ID|Product                   |Quantity_Ordered|Price_Each|Order_Date         |Purchase_Address                    |sales_Year|sales_Month|sales_Day|Total_Sales|
+--------+--------------------------+----------------+----------+-------------------+------------------------------------+----------+-----------+---------+-----------+
|176558  |USB-C Charging Cable      |2               |11.95     |2019-04-19 08:46:00|917 1st St, Dallas, TX 75001        |2019      |4          |19       |23.9       |
|176559  |Bose SoundSport Headphones|1               |99.99     |2019-04-07 22:30:00|682 Chestnut St, Boston, MA 02215   |2019      |4          |7        |99.99      |
|176560  |Google Phone              |1               |600.0     |2019-04-12 14:38:00|669 Spruce St, Los Angeles, CA 90001|2019      |4          |12       |600.0

In [67]:
sales_df.printSchema()

root
 |-- Order_ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity_Ordered: integer (nullable = true)
 |-- Price_Each: float (nullable = true)
 |-- Order_Date: timestamp (nullable = true)
 |-- Purchase_Address: string (nullable = true)
 |-- sales_Year: integer (nullable = true)
 |-- sales_Month: integer (nullable = true)
 |-- sales_Day: integer (nullable = true)
 |-- Total_Sales: float (nullable = true)



In [68]:
#Deriving new city column from purchase address
from pyspark.sql.functions import split
sales_df=sales_df.withColumn('City', split(sales_df['Purchase_Address'], ',').getItem(1))
sales_df.show(5)

+--------+--------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+-----------+------------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|sales_Year|sales_Month|sales_Day|Total_Sales|        City|
+--------+--------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+-----------+------------+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|      2019|          4|       19|       23.9|      Dallas|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|      2019|          4|        7|      99.99|      Boston|
|  176560|        Google Phone|               1|     600.0|2019-04-12 14:38:00|669 Spruce St, Lo...|      2019|          4|       12|      600.0| Los Angeles|
|  176560|    Wired Headphones|               

# Task 1: What is the best month for sales? How much it  was earned ?

In [69]:
# I'll apply PySpark SQL which is  one of the most used PySpark modules which is used for
# processing structured columnar data format.
# Once you have a DataFrame created, you can interact with the data by using SQL syntax.

In [70]:
#Creating Temporal tables and Access the table using PySpark SQL Query
sales_df.createOrReplaceTempView('sales_temp_view')

In [71]:
Best_month_sales=spark.sql('SELECT sales_Month,round(sum(Total_sales),2) as Total_Sales  \
                  From  sales_temp_view group by 1 order by 2 desc limit 3 ')

In [72]:
Best_month_sales.show()
#Answer 1: October and Decemeber are the best monthes for selling products.

+-----------+-----------+
|sales_Month|Total_Sales|
+-----------+-----------+
|         12| 4613443.32|
|         10| 3736726.86|
|          4| 3390670.22|
+-----------+-----------+



# Task 2. Which city had the highest number of sales?

In [73]:
City_highest_sales=spark.sql('SELECT City,round(sum(Total_sales),2) as Total_Sales  \
                          From  sales_temp_view group by 1 order by 2 desc limit 5 ')

In [74]:
City_highest_sales.show()
# Answer 2: San fransisco and Los Angeles are those cities with highest number of sales.  

+--------------+-----------+
|          City|Total_Sales|
+--------------+-----------+
| San Francisco| 8262203.87|
|   Los Angeles| 5452570.77|
| New York City| 4664317.41|
|        Boston| 3661641.99|
|       Atlanta| 2795498.57|
+--------------+-----------+



# Task 3. What time should we display advertisment to maximize the liklihood of customer for buying products?

In [75]:
#Deriving new column Sales_Hour from order_date to extract the time alone
from pyspark.sql.functions import hour
sales_df=sales_df.withColumn('Sales_Hour',hour(sales_df.Order_Date)) 
sales_df.show(3)

+--------+--------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+-----------+------------+----------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|sales_Year|sales_Month|sales_Day|Total_Sales|        City|Sales_Hour|
+--------+--------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+-----------+------------+----------+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|      2019|          4|       19|       23.9|      Dallas|         8|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|      2019|          4|        7|      99.99|      Boston|        22|
|  176560|        Google Phone|               1|     600.0|2019-04-12 14:38:00|669 Spruce St, Lo...|      2019|          4|       12|      600.0| Los 

In [76]:
#Creating Temporal tables and Access the table using PySpark SQL Query
sales_df.createOrReplaceTempView('sales_temp_view')

In [77]:
Time_Display_Advertizement=spark.sql('SELECT Sales_Hour, count(Order_ID) as Total_Orders  \
                                     From  sales_temp_view group by 1 order by 2 desc limit 5 ')

In [78]:
Time_Display_Advertizement.show()
#Answer : @ 19 and 12 are the right times to display advertisment to maximize the liklihood of customer for buying products

+----------+------------+
|Sales_Hour|Total_Orders|
+----------+------------+
|        19|       12905|
|        12|       12587|
|        11|       12411|
|        18|       12280|
|        20|       12228|
+----------+------------+



# Task 4: Segregate/Classify all the products with expensive product,mid range product and the cheaper ones?

In [79]:
Classify_Product=spark.sql('SELECT *,ntile(3) over(order by Price_Each desc) as segregation  \
             From sales_temp_view ')

In [80]:
Classify_Product.show(3)

+--------+------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+-----------+--------------+----------+-----------+
|Order_ID|           Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|sales_Year|sales_Month|sales_Day|Total_Sales|          City|Sales_Hour|segregation|
+--------+------------------+----------------+----------+-------------------+--------------------+----------+-----------+---------+-----------+--------------+----------+-----------+
|  176565|Macbook Pro Laptop|               1|    1700.0|2019-04-24 10:38:00|915 Willow St, Sa...|      2019|          4|       24|     1700.0| San Francisco|        10|          1|
|  176639|Macbook Pro Laptop|               1|    1700.0|2019-04-28 16:14:00|853 Cedar St, San...|      2019|          4|       28|     1700.0| San Francisco|        16|          1|
|  176643|Macbook Pro Laptop|               1|    1700.0|2019-04-27 21:32:00|373 Adams St,

In [81]:
#Segregation result 1 means : highly expensive product, 2 means :medium level and 3 means cheaper one.
expensive_product=Classify_Product.filter(Classify_Product['segregation']==1).select('Product','Price_Each').orderBy('Price_Each',ascending=0)
expensive_product.show(1)
# Macbook Pro Laptop  is expensive one.

+------------------+----------+
|           Product|Price_Each|
+------------------+----------+
|Macbook Pro Laptop|    1700.0|
+------------------+----------+
only showing top 1 row



In [82]:
Cheaper_product=Classify_Product.filter(Classify_Product['segregation']==3).select('Product','Price_Each')
Cheaper_product.createOrReplaceTempView('cheaper_product')
spark.sql('SELECT product,price_each From cheaper_product  where \
        Price_Each=(select min(Price_Each) from cheaper_product) limit 1 ').show()
# AAA Batteries (4-pack) is the cheapest one.

+--------------------+----------+
|             product|price_each|
+--------------------+----------+
|AAA Batteries (4-...|      2.99|
+--------------------+----------+



# 5. How many times each product purchased? Which product is the top purchased one?

In [83]:
Product_purchase_count=spark.sql('SELECT product,count(order_ID) as Total_Purchased_Count  \
                                From sales_temp_view  group by 1 order by 2 desc')
Product_purchase_count.show()

#Top Purchased product : USB-C Charging Cable and Total purchased count 21903

+--------------------+---------------------+
|             product|Total_Purchased_Count|
+--------------------+---------------------+
|USB-C Charging Cable|                21903|
|Lightning Chargin...|                21658|
|AAA Batteries (4-...|                20641|
|AA Batteries (4-p...|                20577|
|    Wired Headphones|                18882|
|Apple Airpods Hea...|                15549|
|Bose SoundSport H...|                13325|
|    27in FHD Monitor|                 7507|
|              iPhone|                 6842|
|27in 4K Gaming Mo...|                 6230|
|34in Ultrawide Mo...|                 6181|
|        Google Phone|                 5525|
|       Flatscreen TV|                 4800|
|  Macbook Pro Laptop|                 4724|
|     ThinkPad Laptop|                 4128|
|        20in Monitor|                 4101|
|     Vareebadd Phone|                 2065|
|  LG Washing Machine|                  666|
|            LG Dryer|                  646|
+---------

# 6. Top fiver ordered Products?

In [84]:
Product_purchase_count.show(5,False)

+------------------------+---------------------+
|product                 |Total_Purchased_Count|
+------------------------+---------------------+
|USB-C Charging Cable    |21903                |
|Lightning Charging Cable|21658                |
|AAA Batteries (4-pack)  |20641                |
|AA Batteries (4-pack)   |20577                |
|Wired Headphones        |18882                |
+------------------------+---------------------+
only showing top 5 rows



In [86]:
#Thank you!