In [None]:
# Data Set Link
# https://drive.google.com/drive/folders/1VzW2eBDCjOc4zzG-dSObFLyQxBSWVbn2

In [1]:
# mount drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install -U pyspark --quiet

In [3]:
# importing libraries
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("Experiment 5")
    .config("spark.driver.memory", "4g") # Changed to a reasonable value '4g' instead of 'lg'
    .config("spark.executor.instances", "3") # Changed to spark.executor.instances
    .config("spark.executor.memory", "1g") # Changed to spark.executor.memory
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .getOrCreate()
)

In [5]:
spark.read.csv("/content/drive/MyDrive/DataSet/Big Data data Sets/Ecommerce Database/Customers.csv",inferSchema=True, header=True).schema

StructType([StructField('CustomerID', IntegerType(), True), StructField('FirstName', StringType(), True), StructField('LastName', StringType(), True), StructField('Date_of_Birth', StringType(), True), StructField('City', StringType(), True), StructField('State', StringType(), True), StructField('Country', StringType(), True), StructField('PostalCode', IntegerType(), True), StructField('Phone', LongType(), True), StructField('Email', StringType(), True), StructField('DateEntered', StringType(), True)])

In [6]:
from pyspark.sql.types import StructField, StringType, IntegerType, StringType, DateType, StructType

customers_schema = StructType(
    [
        StructField('CustomerID', IntegerType(), True),
        StructField('FirstName', StringType(), True),
        StructField('LastName', StringType(), True),
        StructField('Date_of_Birth', DateType(), True),
        StructField('City', StringType(), True),
        StructField('State', StringType(), True),
        StructField('Country', StringType(), True),
        StructField('PostalCode', StringType(), True),
        StructField('Phone', StringType(), True),
        StructField('Email', StringType(), True),
        StructField('DateEntered', DateType(), True)
    ]
)

orders_schema = """
OrderID INT,
CustomerID INT,
PaymentID INT,
OrderDate DATE,
ShipperID INT,
ShipDate Date,
DeliveryDate Date,
Total_order_amount float
"""

In [7]:
# I am reading loading from hdfs if you don't have hadoop in your system
# download the datasets usign this link

customers = spark.read.csv("/content/drive/MyDrive/DataSet/Big Data data Sets/Ecommerce Database/Customers.csv", schema=customers_schema, header=True)
orders = spark.read.csv("/content/drive/MyDrive/DataSet/Big Data data Sets/Ecommerce Database/Orders.csv", schema=orders_schema, header=True)
payments = spark.read.csv("/content/drive/MyDrive/DataSet/Big Data data Sets/Ecommerce Database/Payments.csv", inferSchema=True, header=True)
products = spark.read.csv("/content/drive/MyDrive/DataSet/Big Data data Sets/Ecommerce Database/Products.csv", inferSchema=True, header=True)
orderdetails = spark.read.csv("/content/drive/MyDrive/DataSet/Big Data data Sets/Ecommerce Database/OrderDetails.csv", inferSchema=True, header=True)

In [8]:
### Create a temporary table which is volatile when saprk session is close it will gone if you will
# create global temporary it will store into Hive metastore

customers.createOrReplaceTempView("Customers")
orders.createOrReplaceTempView("Orders")
payments.createOrReplaceTempView("Payments")
products.createOrReplaceTempView("Products")
orderdetails.createOrReplaceTempView("OrderDetails")

In [9]:
customers.show(2, truncate=False)

+----------+---------+---------+-------------+--------+--------+-------------+----------+----------+--------------------------+-----------+
|CustomerID|FirstName|LastName |Date_of_Birth|City    |State   |Country      |PostalCode|Phone     |Email                     |DateEntered|
+----------+---------+---------+-------------+--------+--------+-------------+----------+----------+--------------------------+-----------+
|57081     |James    |Smith    |1987-03-26   |New York|New York|United States|280862    |9638483934|James.Smith@gmail.com     |2020-01-02 |
|57082     |Robert   |Downey Jr|1973-05-24   |New York|New York|United States|376573    |6588282115|Robert.Downey Jr@gmail.com|2020-01-06 |
+----------+---------+---------+-------------+--------+--------+-------------+----------+----------+--------------------------+-----------+
only showing top 2 rows



In [10]:
orders.show(2, truncate=False)

+-------+----------+---------+----------+---------+----------+------------+------------------+
|OrderID|CustomerID|PaymentID|OrderDate |ShipperID|ShipDate  |DeliveryDate|Total_order_amount|
+-------+----------+---------+----------+---------+----------+------------+------------------+
|7655500|57083     |2        |0017-07-12|7        |0018-07-13|0024-07-12  |25112.0           |
|7655501|57086     |3        |0025-07-12|2        |0029-07-12|0032-07-12  |22453.0           |
+-------+----------+---------+----------+---------+----------+------------+------------------+
only showing top 2 rows



In [11]:
products.show(2, truncate=False)

+---------+-------------------------------------------+-----------+--------------------+------+----------+------------+-----------------------------+
|ProductID|Product                                    |Category_ID|Sub_Category        |Brand |Sale_Price|Market_Price|Type                         |
+---------+-------------------------------------------+-----------+--------------------+------+----------+------------+-----------------------------+
|1        |Original Disinfectant Toilet Cleaner Liquid|5001       |All Purpose Cleaners|Harpic|489       |534         |Toilet Cleaners              |
|2        |Surface Disinfectant Spray                 |5001       |All Purpose Cleaners|Savlon|257       |318         |Disinfectant Spray & Cleaners|
+---------+-------------------------------------------+-----------+--------------------+------+----------+------------+-----------------------------+
only showing top 2 rows



In [12]:
payments.show(2)

+---------+-----------+-------+
|PaymentID|PaymentType|Allowed|
+---------+-----------+-------+
|        1| Debit Card|    Yes|
|        2|        POD|    Yes|
+---------+-----------+-------+
only showing top 2 rows



In [13]:
products.show(2)

+---------+--------------------+-----------+--------------------+------+----------+------------+--------------------+
|ProductID|             Product|Category_ID|        Sub_Category| Brand|Sale_Price|Market_Price|                Type|
+---------+--------------------+-----------+--------------------+------+----------+------------+--------------------+
|        1|Original Disinfec...|       5001|All Purpose Cleaners|Harpic|       489|         534|     Toilet Cleaners|
|        2|Surface Disinfect...|       5001|All Purpose Cleaners|Savlon|       257|         318|Disinfectant Spra...|
+---------+--------------------+-----------+--------------------+------+----------+------------+--------------------+
only showing top 2 rows



In [14]:
orderdetails.show(2)

+-------------+-------+---------+--------+----------+
|OrderDetailID|OrderID|ProductID|Quantity|SupplierID|
+-------------+-------+---------+--------+----------+
|            1|7655500|    14955|       2|         3|
|            2|7655500|    19946|       4|         2|
+-------------+-------+---------+--------+----------+
only showing top 2 rows



In [15]:
#Find the Monthly revenue and %change in revenue with previous month

query = """
WITH CTE AS
(
    SELECT
        YEAR(to_date(OrderDate, 'dd-MM-yyyy')) as Years,
        MONTH(to_date(OrderDate, 'dd-MM-yyyy')) as Months,
        CAST(SUM(Total_order_amount) AS INT) as Revenue
    FROM Orders
    GROUP BY YEAR(to_date(OrderDate, 'dd-MM-yyyy')), MONTH(to_date(OrderDate, 'dd-MM-yyyy'))
)
SELECT *,
    LAG(Revenue, 1) OVER(PARTITION BY Years ORDER BY Months) AS Previous_month_revenue,
    ROUND(((Revenue - LAG(Revenue, 1) OVER(PARTITION BY Years ORDER BY Months))/Revenue)*100, 2) AS per_change
FROM CTE
"""
spark.sql(query).show()

+-----+------+-------+----------------------+----------+
|Years|Months|Revenue|Previous_month_revenue|per_change|
+-----+------+-------+----------------------+----------+
|    6|     7| 108490|                  NULL|      NULL|
|    6|     8| 100159|                108490|     -8.32|
|    6|     9|  63500|                100159|    -57.73|
|    6|    10| 257233|                 63500|     75.31|
|    6|    11| 319836|                257233|     19.57|
|    6|    12| 158107|                319836|   -102.29|
|    7|     1| 396219|                  NULL|      NULL|
|    7|     2| 290424|                396219|    -36.43|
|    7|     3| 167177|                290424|    -73.72|
|    7|     4| 265502|                167177|     37.03|
|    7|     5| 383441|                265502|     30.76|
|    7|     6| 306854|                383441|    -24.96|
|    7|     7|  34586|                306854|   -787.22|
|    7|     8|  60383|                 34586|     42.72|
|    7|     9|  42780|         

In [16]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
yoy_sales = (
orders.groupBy(
    F.year(F.col('orderdate')).alias("Years"),
    F.month(F.col('OrderDate')).alias("Months")
)
.agg(F.sum(col("Total_order_amount")).cast("int").alias("Revenue"))
)

windowSpec = Window.partitionBy("Years").orderBy("Months")

yoy_sales.withColumns(
    {"previos_moth_sale" : F.lag(col("Revenue"), 1).over(windowSpec),
    "percent_channge": F.round((col("Revenue") - F.lag(col("Revenue"), 1).over(windowSpec))/col("Revenue") * 100, 2)}
).show()

+-----+------+-------+-----------------+---------------+
|Years|Months|Revenue|previos_moth_sale|percent_channge|
+-----+------+-------+-----------------+---------------+
|    6|     7| 108490|             NULL|           NULL|
|    6|     8| 100159|           108490|          -8.32|
|    6|     9|  63500|           100159|         -57.73|
|    6|    10| 257233|            63500|          75.31|
|    6|    11| 319836|           257233|          19.57|
|    6|    12| 158107|           319836|        -102.29|
|    7|     1| 396219|             NULL|           NULL|
|    7|     2| 290424|           396219|         -36.43|
|    7|     3| 167177|           290424|         -73.72|
|    7|     4| 265502|           167177|          37.03|
|    7|     5| 383441|           265502|          30.76|
|    7|     6| 306854|           383441|         -24.96|
|    7|     7|  34586|           306854|        -787.22|
|    7|     8|  60383|            34586|          42.72|
|    7|     9|  42780|         

In [17]:
### Find the year monthly wise revenue and commulative revenue
import seaborn as sns
query = """

WITH CTE AS
(
SELECT
    YEAR(OrderDate) AS Years,
    MONTH(OrderDate) AS Months,
    CAST(SUM(Total_order_amount) AS INT) AS Revenue
FROM Orders
GROUP BY YEAR(OrderDate), MONTH(OrderDate)
)
SELECT *,
    SUM(Revenue) OVER(PARTITION BY Years ORDER BY Months) AS Commulative_Revenue
FROM CTE
"""
spark.sql(query).show()

+-----+------+-------+-------------------+
|Years|Months|Revenue|Commulative_Revenue|
+-----+------+-------+-------------------+
|    6|     7| 108490|             108490|
|    6|     8| 100159|             208649|
|    6|     9|  63500|             272149|
|    6|    10| 257233|             529382|
|    6|    11| 319836|             849218|
|    6|    12| 158107|            1007325|
|    7|     1| 396219|             396219|
|    7|     2| 290424|             686643|
|    7|     3| 167177|             853820|
|    7|     4| 265502|            1119322|
|    7|     5| 383441|            1502763|
|    7|     6| 306854|            1809617|
|    7|     7|  34586|            1844203|
|    7|     8|  60383|            1904586|
|    7|     9|  42780|            1947366|
|    7|    10|  83268|            2030634|
|    7|    11|  79095|            2109729|
|    7|    12| 385201|            2494930|
|    8|     1| 132418|             132418|
|    8|     2| 306376|             438794|
+-----+----

In [18]:
query = """
WITH CTE1 AS
(
SELECT
    c.ProductID,
    c.Product,
    SUM(a.Total_order_amount) AS Revenue,
    SUM(a.Total_order_amount)/(SELECT SUM(Total_order_amount) FROM Orders) * 100 AS percent_revenue
FROM Orders AS a
JOIN OrderDetails AS b
ON a.OrderID = b.OrderID
JOIN Products AS c
ON b.ProductID = c.ProductID
GROUP BY c.ProductID, c.Product
),
CTE2 AS
(
SELECT *,
    SUM(percent_revenue) OVER(ORDER BY Revenue DESC) AS commulative_percent
FROM CTE1
)
SELECT
    ProductID,
    Product,
    Revenue
FROM CTE2
WHERE commulative_percent <= 10
"""

spark.sql(query).show()


+---------+--------------------+------------------+
|ProductID|             Product|           Revenue|
+---------+--------------------+------------------+
|    12576|Raw Manuka Honey ...|         309849.75|
|    18730|Futura Hard Anodi...|          282753.0|
|     4711|AQVA Divina Body ...|         275639.75|
|     4638|Be Delicious Eau ...|    271510.8515625|
|    19320|Lily Ville Square...|          257149.0|
|    20295|Quantum Max Dishw...|     247793.953125|
|     4139|New Imperial 1000...|          236096.0|
|    12290|Chunky Orange Mar...|   234557.80078125|
|     6927|Hot Water Rubber ...|234380.35009765625|
|     4817|Voyage Sport Eau ...|     233160.078125|
|    14645|       Bhringraj Oil| 231098.7998046875|
|    15164|Healthy Khakhra -...|          229811.0|
|    18568|Stainless Steel C...|          223229.0|
|    20326|   Laundry Detergent|    220203.4765625|
|    10939|Liver Tonic & App...|   220038.69921875|
|      833|Baby Bottle & Foo...|          216971.0|
|    10759|A

In [19]:
from pyspark.sql.window import Window
customer_order = (
customers.join(
          orders,
          on="CustomerID")
        .select("CustomerID", "FirstName", "LastName", "OrderID", "OrderDate")
)

windowspec = Window.partitionBy(F.col("CustomerID")).orderBy(F.col("OrderDate"))
difference = customer_order.withColumns(
    {
     "previous_order": F.lag(F.col("OrderDate")).over(windowspec),
     "difference": F.date_diff(F.col("OrderDate"), F.lag(F.col("OrderDate")).over(windowspec))
    }
)
difference.groupBy(
    F.col("CustomerID"),
    F.col("FirstName"),
    F.col("LastName")).agg(
    F.round(F.avg(F.col("difference")), 2).alias("avg_difference")
).orderBy(F.col("avg_difference")).filter(F.col("avg_difference").isNotNull()).show()

+----------+---------+--------+--------------+
|CustomerID|FirstName|LastName|avg_difference|
+----------+---------+--------+--------------+
|     57605|  Malcolm|  Julian|           0.0|
|     57393|   Jayden|  Connor|        261.33|
|     57496|     Joel|    Zayn|        365.25|
|     57604|    Jesse| Francis|        365.33|
|     57603|    Idris|   Colby|        417.43|
|     57517|    Jakub| Stephen|         444.4|
|     57583|   Conall|   David|         474.8|
|     57346|    Blair| Melissa|         514.0|
|     57267|  Marilyn| Johnson|         527.5|
|     57587|   Lawson|   Sonny|        547.83|
|     57594|    River|   Louie|         548.0|
|     57486|     Cruz|  Duncan|        564.94|
|     57392|  Lochlan| Ruaridh|        570.27|
|     57569| Abdullah| Lochlan|        575.67|
|     57256|   Janice| Vazquez|         576.0|
|     57513|    Billy|  Harley|        578.33|
|     57383|    Roman|  Hamish|        586.06|
|     57213|  Shirley|  Taylor|        589.81|
|     57597| 

In [20]:
query = """
WITH CTE AS
(
SELECT
    a.CustomerID,
    FirstName,
    LastName,
    OrderID,
    OrderDate,
    LAG(OrderDate, 1) OVER(PARTITION BY a.CustomerID ORDER BY OrderDate) AS previous_orderdate,
    DATEDIFF(OrderDate, LAG(OrderDate, 1) OVER(PARTITION BY a.CustomerID ORDER BY OrderDate)) AS difference
FROM Customers AS a
JOIN Orders AS b
ON a.CustomerID = b.CustomerID
)
SELECT
    CustomerID,
    FirstName,
    LastName,
    ROUND(AVG(difference), 2) as avg_difference
FROM CTE
GROUP BY CustomerID, FirstName, LastName
HAVING ROUND(AVG(difference), 2) IS NOT NULL
ORDER BY avg_difference
"""

spark.sql(query).show()

+----------+---------+--------+--------------+
|CustomerID|FirstName|LastName|avg_difference|
+----------+---------+--------+--------------+
|     57605|  Malcolm|  Julian|           0.0|
|     57393|   Jayden|  Connor|        261.33|
|     57496|     Joel|    Zayn|        365.25|
|     57604|    Jesse| Francis|        365.33|
|     57603|    Idris|   Colby|        417.43|
|     57517|    Jakub| Stephen|         444.4|
|     57583|   Conall|   David|         474.8|
|     57346|    Blair| Melissa|         514.0|
|     57267|  Marilyn| Johnson|         527.5|
|     57587|   Lawson|   Sonny|        547.83|
|     57594|    River|   Louie|         548.0|
|     57486|     Cruz|  Duncan|        564.94|
|     57392|  Lochlan| Ruaridh|        570.27|
|     57569| Abdullah| Lochlan|        575.67|
|     57256|   Janice| Vazquez|         576.0|
|     57513|    Billy|  Harley|        578.33|
|     57383|    Roman|  Hamish|        586.06|
|     57213|  Shirley|  Taylor|        589.81|
|     57597| 

In [21]:
# Most selling Product in terms of quantity in each country
windowspec = Window.partitionBy(F.col("Country")).orderBy(F.col("total_quantity").desc())
(
customers.join(orders,
               on="CustomerID")
         .join(orderdetails, on="OrderID")
         .join(products, on="ProductID")
         .select("Country", "ProductID", "Product","Quantity")
         .groupBy("Country", "ProductID", "Product").agg(
             F.sum(F.col("Quantity")).alias("total_quantity")
         ).withColumn("rank", F.dense_rank().over(windowspec))
         .filter(F.col("rank") == 1)
).show()

+----------------+---------+--------------------+--------------+----+
|         Country|ProductID|             Product|total_quantity|rank|
+----------------+---------+--------------------+--------------+----+
|       Australia|    21319|BBPopular Almond/...|            35|   1|
|         Austria|    13902|    Skin Therapy Oil|            33|   1|
|         Austria|    11912|           Pav Bhaji|            33|   1|
|         Belgium|    14069|Honey & Rose Body...|            39|   1|
|          France|     2184|Milk Shakti Biscu...|            42|   1|
|         Germany|    12058|Rice - Ponni, Boiled|            37|   1|
|          Greece|    15203|Kochi Masala Bana...|            53|   1|
|           India|     3947|Liquid Food Colou...|            57|   1|
|         Ireland|    17531|Wooden Printed Ir...|            39|   1|
|           Italy|     1237|Glycerin Germ Pro...|            35|   1|
|     Netherlands|      510|Alkaline Battery ...|            36|   1|
|     New Zealand|  

In [22]:
# check is there any trend every month some different product on peak

window_spec = Window.partitionBy(F.col("months")).orderBy(F.col("quantity_sold").desc())
(
    orders.join(orderdetails, on="OrderID")
    .join(products, on="ProductID")
    .groupby(F.month(F.col("OrderDate")).alias("months"),
             F.col("ProductID"),
             F.col("Product"))
    .agg(
        F.sum(F.col("Quantity")).alias("quantity_sold")
    )
    .withColumn("rank", F.dense_rank().over(window_spec))
    .filter(F.col("rank") == 1)
).show()

+------+---------+--------------------+-------------+----+
|months|ProductID|             Product|quantity_sold|rank|
+------+---------+--------------------+-------------+----+
|     1|    19599|        Pepper Gouda|           38|   1|
|     2|     7965|Kashmiri Chilly P...|           40|   1|
|     3|     3558|Dark Chocolate - ...|           45|   1|
|     4|    20538|Ultra Soft Premiu...|           48|   1|
|     5|     9422|Cleaning Brush - ...|           49|   1|
|     5|    16108|Peanut Butter - C...|           49|   1|
|     6|     6226|NHP 8103 Silky Sh...|           51|   1|
|     7|    10481|Party Eye Mask - ...|           30|   1|
|     8|      638|Orthodontic Sooth...|           34|   1|
|     9|     6195|Taft Classic Hair...|           34|   1|
|     9|    16921|Copper Steel Wate...|           34|   1|
|    10|    17623|Square Pet water ...|           37|   1|
|    11|    20009|Moong Dal/Hesaru ...|           39|   1|
|    12|     6962|     Hot Water - Bag|           38|   

In [23]:
# avg deleivery time for each country rank them in descending orders

windowspec = Window.partitionBy("Country").orderBy(F.col("avg_lead_time"))
(
 customers.join(orders, on="CustomerID")
 .groupBy("Country", "City").agg(
     F.avg(F.date_diff(F.col("DeliveryDate"), F.col("OrderDate"))).alias("avg_lead_time"))
     .withColumn("rank", F.row_number().over(windowspec))
).show()

+---------+--------------+-------------------+----+
|  Country|          City|      avg_lead_time|rank|
+---------+--------------+-------------------+----+
|Australia|        Sydney|            -2982.5|   1|
|Australia|      Ballarat|-2870.3076923076924|   2|
|Australia|     Newcastle|          -2766.125|   3|
|Australia|      Canberra|-2197.3333333333335|   4|
|Australia|      Adelaide|            -1867.0|   5|
|Australia|Sunshine Coast|-1567.6666666666667|   6|
|Australia|    Wollongong|            -1001.4|   7|
|Australia|         Perth|             -821.8|   8|
|Australia|      Brisbane| -325.2903225806452|   9|
|Australia|        Hobart|-114.45454545454545|  10|
|Australia|        Cairns| -100.6842105263158|  11|
|Australia|        Darwin| -49.72727272727273|  12|
|Australia|    Townsville| 112.15789473684211|  13|
|Australia|     Melbourne|  738.8181818181819|  14|
|Australia| Central Coast|           1517.875|  15|
|Australia|       Geelong|  3187.285714285714|  16|
|  Austria| 