In [2]:
!pip install pyspark py4j
!pip install streamlit

Collecting streamlit
  Downloading streamlit-1.41.1-py2.py3-none-any.whl.metadata (8.5 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.41.1-py2.py3-none-any.whl (9.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.1/9.1 MB[0m [31m72.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m92.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[

In [214]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,year,month,quarter,udf,lit,countDistinct,count,date_format,collect_set,current_date,max,day,expr,sum,ntile,concat
from pyspark.sql.types import *
from pyspark.sql.window import Window
import plotly.express as px
import streamlit as st

spark=SparkSession.builder.getOrCreate()

In [215]:
sales_schema=StructType([ StructField("S_Product_id",IntegerType(),False),
                          StructField("Customer_id",StringType(),False),
                          StructField("Order_date",DateType(),True),
                          StructField("Location",StringType(),True),
                          StructField("Source_order",StringType(),True)
                   ])

menu_schema=StructType([ StructField("M_Product_id",IntegerType(),False),
                          StructField("Product_name",StringType(),False),
                          StructField("Price",FloatType(),True),
                   ])

sales_df=spark.read.format("csv").schema(sales_schema).load(path="/content/sales.csv.txt",inferSchema=True)
menu_df =spark.read.format("csv").schema(menu_schema ).load(path="/content/menu.csv.txt" ,inferSchema=True)

sales_df.show()
menu_df.show()

+------------+-----------+----------+--------+------------+
|S_Product_id|Customer_id|Order_date|Location|Source_order|
+------------+-----------+----------+--------+------------+
|           1|          A|2023-01-01|   India|      Swiggy|
|           2|          A|2022-01-01|   India|      Swiggy|
|           2|          A|2023-01-07|   India|      Swiggy|
|           3|          A|2023-01-10|   India|  Restaurant|
|           3|          A|2022-01-11|   India|      Swiggy|
|           3|          A|2023-01-11|   India|  Restaurant|
|           2|          B|2022-02-01|   India|      Swiggy|
|           2|          B|2023-01-02|   India|      Swiggy|
|           1|          B|2023-01-04|   India|  Restaurant|
|           1|          B|2023-02-11|   India|      Swiggy|
|           3|          B|2023-01-16|   India|      zomato|
|           3|          B|2022-02-01|   India|      zomato|
|           3|          C|2023-01-01|   India|      zomato|
|           1|          C|2023-01-01|   

In [216]:
df=sales_df.join(menu_df,on=sales_df.S_Product_id==menu_df.M_Product_id,how="inner").drop("M_product_id")
df.show()

+------------+-----------+----------+--------+------------+------------+-----+
|S_Product_id|Customer_id|Order_date|Location|Source_order|Product_name|Price|
+------------+-----------+----------+--------+------------+------------+-----+
|           1|          A|2023-01-01|   India|      Swiggy|       PIZZA|100.0|
|           2|          A|2022-01-01|   India|      Swiggy|     Chowmin|150.0|
|           2|          A|2023-01-07|   India|      Swiggy|     Chowmin|150.0|
|           3|          A|2023-01-10|   India|  Restaurant|    sandwich|120.0|
|           3|          A|2022-01-11|   India|      Swiggy|    sandwich|120.0|
|           3|          A|2023-01-11|   India|  Restaurant|    sandwich|120.0|
|           2|          B|2022-02-01|   India|      Swiggy|     Chowmin|150.0|
|           2|          B|2023-01-02|   India|      Swiggy|     Chowmin|150.0|
|           1|          B|2023-01-04|   India|  Restaurant|       PIZZA|100.0|
|           1|          B|2023-02-11|   India|      

In [217]:
df=df.withColumns( {"Year":year("Order_date"),"Month":month("Order_date"), "Quarter":quarter("Order_date") })
df.show(n=5)

+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|S_Product_id|Customer_id|Order_date|Location|Source_order|Product_name|Price|Year|Month|Quarter|
+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|           1|          A|2023-01-01|   India|      Swiggy|       PIZZA|100.0|2023|    1|      1|
|           2|          A|2022-01-01|   India|      Swiggy|     Chowmin|150.0|2022|    1|      1|
|           2|          A|2023-01-07|   India|      Swiggy|     Chowmin|150.0|2023|    1|      1|
|           3|          A|2023-01-10|   India|  Restaurant|    sandwich|120.0|2023|    1|      1|
|           3|          A|2022-01-11|   India|      Swiggy|    sandwich|120.0|2022|    1|      1|
+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
only showing top 5 rows



In [219]:
df.dtypes

[('S_Product_id', 'int'),
 ('Customer_id', 'string'),
 ('Order_date', 'date'),
 ('Location', 'string'),
 ('Source_order', 'string'),
 ('Product_name', 'string'),
 ('Price', 'float'),
 ('Year', 'int'),
 ('Month', 'int'),
 ('Quarter', 'int')]

**Detect Null Values**

In [220]:
df.select(*( sum( col(i).isNull().cast("integer") ).alias(i) for i in df.columns)).show()

+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|S_Product_id|Customer_id|Order_date|Location|Source_order|Product_name|Price|Year|Month|Quarter|
+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|           0|          0|         0|       0|           0|           0|    0|   0|    0|      0|
+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+



**Check Duplicates**

In [226]:
df.count()

117

In [228]:
print("Number of duplicated records = ", df.exceptAll(df.dropDuplicates()).count())

Number of duplicated records =  5


In [227]:
df.exceptAll(df.dropDuplicates()).show()

+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|S_Product_id|Customer_id|Order_date|Location|Source_order|Product_name|Price|Year|Month|Quarter|
+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|           1|          B|2023-02-11|   India|      Swiggy|       PIZZA|100.0|2023|    2|      1|
|           1|          B|2023-02-11|   India|      Swiggy|       PIZZA|100.0|2023|    2|      1|
|           4|          E|2023-02-07|      UK|  Restaurant|        Dosa|110.0|2023|    2|      1|
|           3|          D|2023-02-16|      UK|  Restaurant|    sandwich|120.0|2023|    2|      1|
|           3|          D|2023-02-16|      UK|  Restaurant|    sandwich|120.0|2023|    2|      1|
+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+



In [230]:
df=df.dropDuplicates()
print("Number of duplicated records = ", df.exceptAll(df.dropDuplicates()).count())
df.count()

Number of duplicated records =  0


112

**Q: Do product prices vary depending on the source or location ?**

In [231]:
df.groupBy("Location","Source_order","Product_name").agg(collect_set(col("Price")).alias("Prices")).sort("Product_name").show(truncate=False)

+--------+------------+------------+-------+
|Location|Source_order|Product_name|Prices |
+--------+------------+------------+-------+
|UK      |zomato      | Biryani    |[80.0] |
|USA     |zomato      | Biryani    |[80.0] |
|UK      |zomato      | Chowmin    |[150.0]|
|UK      |Swiggy      | Chowmin    |[150.0]|
|India   |Swiggy      | Chowmin    |[150.0]|
|UK      |Restaurant  | Dosa       |[110.0]|
|USA     |zomato      | Dosa       |[110.0]|
|UK      |Swiggy      | Dosa       |[110.0]|
|India   |Restaurant  | PIZZA      |[100.0]|
|USA     |Swiggy      | PIZZA      |[100.0]|
|UK      |Swiggy      | PIZZA      |[100.0]|
|India   |Swiggy      | PIZZA      |[100.0]|
|UK      |Restaurant  | PIZZA      |[100.0]|
|USA     |zomato      | Pasta      |[180.0]|
|UK      |zomato      | Pasta      |[180.0]|
|USA     |Swiggy      | sandwich   |[120.0]|
|USA     |zomato      | sandwich   |[120.0]|
|India   |zomato      | sandwich   |[120.0]|
|UK      |Swiggy      | sandwich   |[120.0]|
|UK      |

**Answer:** Product prices are fixed in all sources in all countries.

**Total Sales for each Source Order**

In [232]:
x=df.groupBy("Source_order").sum("Price").withColumnRenamed("sum(Price)","Total Sales")
x.show()
fig=px.pie(data_frame=x,
           names="Source_order",
           values="Total Sales",
           color="Source_order",
           color_discrete_sequence=["red","green","blue"],
           title="Total Sales for each Source"
           )
fig.update_traces(textinfo="label+percent",textposition="outside")
fig.show()

+------------+-----------+
|Source_order|Total Sales|
+------------+-----------+
|      zomato|     4920.0|
|      Swiggy|     6130.0|
|  Restaurant|     2740.0|
+------------+-----------+



**Q: Why does Swiggy achieve the highest sales despite the stability of product prices in all other sources ?**

In [233]:
df.select("Location","Source_order","Product_name").groupBy("Source_order").agg(collect_set(col("Location")).alias("Countries"),collect_set(col("Product_name")).alias("Products")).show(truncate=False)

+------------+----------------+----------------------------------------------+
|Source_order|Countries       |Products                                      |
+------------+----------------+----------------------------------------------+
|zomato      |[India, UK, USA]|[ Pasta,  Biryani,  Dosa,  sandwich,  Chowmin]|
|Swiggy      |[India, UK, USA]|[ Dosa,  sandwich,  Chowmin,  PIZZA]          |
|Restaurant  |[India, UK]     |[ Dosa,  sandwich,  PIZZA]                    |
+------------+----------------+----------------------------------------------+



**Answer:** Swiggy achieves the highest sales because it is open in many countries and offers the most popular and most affordable products.

**Total Sales for each Product**

In [234]:
x=df.groupBy("Product_name").sum("Price").withColumnRenamed("sum(Price)","Total Sales").sort(col("Total Sales").desc())
x.show()
fig=px.bar(data_frame=x,
           x="Total Sales",
           y="Product_name",
           color="Product_name",
           color_discrete_sequence=["red","blue","green","yellow","gray","orange"],
           text="Total Sales",
           title="Total Sales for each Product",
           orientation="h"
           )
fig.show()

+------------+-----------+
|Product_name|Total Sales|
+------------+-----------+
|    sandwich|     5520.0|
|     Chowmin|     3600.0|
|       PIZZA|     1900.0|
|        Dosa|     1210.0|
|       Pasta|     1080.0|
|     Biryani|      480.0|
+------------+-----------+



**Best Selling Product**

In [235]:
x=df.groupBy("Product_name","Year").count().orderBy("count").withColumn("Year",col("Year").cast("string"))
x.show()
fig=px.bar(data_frame=x,
           x="count",
           y="Product_name",
           color="Year",
           color_discrete_sequence=["red","blue"],
           text="count",
           title="Best Selling Product",
           orientation="h",
           barmode="relative"
           )
fig.show()

+------------+----+-----+
|Product_name|Year|count|
+------------+----+-----+
|       Pasta|2023|    3|
|        Dosa|2022|    3|
|     Biryani|2023|    3|
|     Biryani|2022|    3|
|       Pasta|2022|    3|
|        Dosa|2023|    8|
|    sandwich|2022|   12|
|     Chowmin|2023|   12|
|     Chowmin|2022|   12|
|       PIZZA|2023|   19|
|    sandwich|2023|   34|
+------------+----+-----+



**Note:** Demand increased during 2023 compared to the previous year for products such as **Sandwish** , **PIZZA** and **Dosa** and remained the same for products such as 'Chowmin','Biryani' and 'Pasta'

**Total Sales for each Country**

In [236]:
x=df.groupBy("Location").sum("Price").withColumnRenamed("sum(Price)","Total Sales")
x.show()
fig=px.bar(data_frame=x,
           x="Location",
           y="Total Sales",
           color="Location",
           color_discrete_sequence=["orange","red","blue"],
           text="Total Sales",
           title="Total Sales for each Country",
           labels={"Location":"Country"})
fig.show()

+--------+-----------+
|Location|Total Sales|
+--------+-----------+
|   India|     4660.0|
|     USA|     2460.0|
|      UK|     6670.0|
+--------+-----------+



**No of Orders in each Country**

In [318]:
x=df.groupBy("Location").count()
x.show()
fig=px.bar(data_frame=x,
           x="Location",
           y="count",
           color="Location",
           color_discrete_sequence=["orange","red","blue"],
           text="count",
           title="No of Orders in each Country",
           labels={"Location":"Country"})
fig.show()

+--------+-----+
|Location|count|
+--------+-----+
|   India|   37|
|     USA|   21|
|      UK|   54|
+--------+-----+



**Total Payment for each Customer**

In [237]:
x=df.groupBy("Customer_id").sum("Price").withColumnRenamed("sum(Price)","Total Payment")
x.show()
fig=px.bar(data_frame=x,
           x="Total Payment",
           y="Customer_id",
           color="Customer_id",
           color_discrete_sequence=["red","gray","blue","green","yellow"],
           text="Total Payment",
           title="Total Payment for each Customer",
           orientation="h",
           category_orders={"Customer_id":["A","B","C","D","E"]}
           )
fig.show()

+-----------+-------------+
|Customer_id|Total Payment|
+-----------+-------------+
|          E|       1930.0|
|          B|       4240.0|
|          D|        960.0|
|          C|       2400.0|
|          A|       4260.0|
+-----------+-------------+



**No of Orders for each Customer**

**Frequency of Customer Visited**



In [238]:
x=df.groupBy("Customer_id").count().sort("Customer_id")
x.show()
fig=px.bar(data_frame=x,
           x="count",
           y="Customer_id",
           color="Customer_id",
           color_discrete_sequence=["red","gray","blue","green","yellow"],
           text="count",
           title="No of Orders for each Customer",
           orientation="h",
           category_orders={"Customer_id":["A","B","C","D","E"]}
           )
fig.show()

+-----------+-----+
|Customer_id|count|
+-----------+-----+
|          A|   33|
|          B|   34|
|          C|   18|
|          D|   10|
|          E|   17|
+-----------+-----+



**Comparison between Customer Visits to their Orders**

In [239]:
x=df.groupBy("Customer_id").agg(countDistinct("Order_date").alias("Number of Visits"),count("Customer_id").alias("Number of Orders")).sort("Customer_id")
x.show()
fig=px.area(data_frame=x,
            x="Customer_id",
            y=["Number of Visits","Number of Orders"],
            color_discrete_sequence=["red","green"],
            title="Comparison between Customer Visits to their Orders"
            )
fig.show()

+-----------+----------------+----------------+
|Customer_id|Number of Visits|Number of Orders|
+-----------+----------------+----------------+
|          A|              17|              33|
|          B|              22|              34|
|          C|               9|              18|
|          D|              10|              10|
|          E|              14|              17|
+-----------+----------------+----------------+



**Yearly Sales**

In [240]:
x=df.groupBy("Year").sum("Price").withColumnRenamed("sum(Price)","Total Sales").withColumn("Year",col("Year").cast("string"))
x.show()
fig=px.bar(data_frame=x,
           x="Year",
           y="Total Sales",
           color="Year",
           text="Total Sales",
           title="Yearly Sales",
           category_orders={"Year":["2022","2023"]},
           color_discrete_sequence=["blue","red"]
           )
fig.show()

+----+-----------+
|Year|Total Sales|
+----+-----------+
|2023|     9440.0|
|2022|     4350.0|
+----+-----------+



**Quarterly Sales**

In [241]:
x=df.groupBy("Quarter").sum("Price").withColumnRenamed("sum(Price)","Total Sales").withColumn("Quarter",col("Quarter").cast("string"))
x.show()
fig=px.bar(data_frame=x,
           x="Quarter",
           y="Total Sales",
           color="Quarter",
           color_discrete_sequence=["blue","red","gray","green"],
           text="Total Sales",
           category_orders={"Quarter":["1","2","3","4"]},
           title="Quarterly Sales"
                      )
fig.show()

+-------+-----------+
|Quarter|Total Sales|
+-------+-----------+
|      1|     6050.0|
|      3|      910.0|
|      4|      910.0|
|      2|     5920.0|
+-------+-----------+



**Monthly Sales**

In [242]:
x=df.withColumn("Month_name", date_format( col("order_date") ,"MMMM")).groupBy("Month","Month_name").sum("Price").withColumnRenamed("sum(Price)","Total Sales").sort(col("Month").asc())#.withColumn("Month",col("Month").cast("string"))
x.show()
fig=px.bar(data_frame=x,
           x="Month",
           y="Total Sales",
           text="Total Sales",
           hover_name="Month_name",
           title="Monthly Sales",
           color_discrete_sequence=["red"]
                      )
fig.show()

+-----+----------+-----------+
|Month|Month_name|Total Sales|
+-----+----------+-----------+
|    1|   January|     2960.0|
|    2|  February|     2180.0|
|    3|     March|      910.0|
|    5|       May|     2960.0|
|    6|      June|     2960.0|
|    7|      July|      910.0|
|   11|  November|      910.0|
+-----+----------+-----------+



**Most Visited Source Order**

In [309]:
x=df.groupBy("Source_order","Year").agg(countDistinct("Order_date").alias("count")).orderBy("count").withColumn("Year",col("Year").cast("string"))
x.show()
fig=px.bar(data_frame=x,
           x="count",
           y="Source_order",
           color="Year",
           color_discrete_sequence=["blue","red"],
           text="count",
           title="Most Visited Source Order",
           orientation="h",
           barmode="relative"
           )
fig.show()

+------------+----+-----+
|Source_order|Year|count|
+------------+----+-----+
|      Swiggy|2022|    9|
|      zomato|2022|   12|
|      zomato|2023|   20|
|  Restaurant|2023|   21|
|      Swiggy|2023|   22|
+------------+----+-----+



**Applying RFM Analysis**

In [244]:
df.show()

+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|S_Product_id|Customer_id|Order_date|Location|Source_order|Product_name|Price|Year|Month|Quarter|
+------------+-----------+----------+--------+------------+------------+-----+----+-----+-------+
|           1|          C|2023-05-05|      UK|      Swiggy|       PIZZA|100.0|2023|    5|      2|
|           3|          C|2023-05-05|      UK|  Restaurant|    sandwich|120.0|2023|    5|      2|
|           3|          E|2023-02-06|      UK|  Restaurant|    sandwich|120.0|2023|    2|      1|
|           3|          D|2022-03-16|     USA|      zomato|    sandwich|120.0|2022|    3|      1|
|           3|          E|2023-02-01|      UK|  Restaurant|    sandwich|120.0|2023|    2|      1|
|           3|          A|2023-06-11|   India|  Restaurant|    sandwich|120.0|2023|    6|      2|
|           3|          C|2023-06-06|      UK|  Restaurant|    sandwich|120.0|2023|    6|      2|
|           3|      

In [245]:
current_date="2024-1-1"
RFM=df.groupBy("Customer_id").agg(     ( current_date-max("Order_date")  ).alias("Recency") ,
                                                    count("Customer_id").alias("Frequency") ,
                                                             sum("Price").alias("Monetory")
                                 ).sort("Customer_id").withColumn("Recency",col("Recency").cast("integer"))
RFM.show()

+-----------+-------+---------+--------+
|Customer_id|Recency|Frequency|Monetory|
+-----------+-------+---------+--------+
|          A|    204|       33|  4260.0|
|          B|     51|       34|  4240.0|
|          C|    204|       18|  2400.0|
|          D|     56|       10|   960.0|
|          E|     51|       17|  1930.0|
+-----------+-------+---------+--------+



In [246]:
RFM=RFM.withColumns({ "Recency_Score":   ntile(3).over( Window.orderBy( col("Recency").desc() ) ),
                      "Frequency_Score": ntile(3).over( Window.orderBy( col("Frequency").asc() ) ),
                      "Monetory_Score":  ntile(3).over( Window.orderBy( col("Monetory").asc() ) ),
                 })
RFM=RFM.withColumn( "RFM_Score",  col("Recency_Score")+col("Frequency_Score")+col("Monetory_Score") )
RFM.show()

+-----------+-------+---------+--------+-------------+---------------+--------------+---------+
|Customer_id|Recency|Frequency|Monetory|Recency_Score|Frequency_Score|Monetory_Score|RFM_Score|
+-----------+-------+---------+--------+-------------+---------------+--------------+---------+
|          D|     56|       10|   960.0|            2|              1|             1|        4|
|          E|     51|       17|  1930.0|            3|              1|             1|        5|
|          C|    204|       18|  2400.0|            1|              2|             2|        5|
|          B|     51|       34|  4240.0|            2|              3|             2|        7|
|          A|    204|       33|  4260.0|            1|              2|             3|        6|
+-----------+-------+---------+--------+-------------+---------------+--------------+---------+



In [251]:
@udf(returnType=StringType())
def segment(rfm):
    if rfm < 5:
      return "Hibernating Customer"
    elif rfm < 7:
      return "Need Attention"
    else:
      return "VIP Customer"

RFM=RFM.withColumn("Segment", segment(col("RFM_Score")))
RFM.show()

+-----------+-------+---------+--------+-------------+---------------+--------------+---------+--------------------+
|Customer_id|Recency|Frequency|Monetory|Recency_Score|Frequency_Score|Monetory_Score|RFM_Score|             Segment|
+-----------+-------+---------+--------+-------------+---------------+--------------+---------+--------------------+
|          D|     56|       10|   960.0|            2|              1|             1|        4|Hibernating Customer|
|          E|     51|       17|  1930.0|            3|              1|             1|        5|      Need Attention|
|          C|    204|       18|  2400.0|            1|              2|             2|        5|      Need Attention|
|          B|     51|       34|  4240.0|            2|              3|             2|        7|        VIP Customer|
|          A|    204|       33|  4260.0|            1|              2|             3|        6|      Need Attention|
+-----------+-------+---------+--------+-------------+----------

In [254]:
fig=px.treemap(data_frame=RFM,
               path=["Segment","Customer_id"],
               color="Segment",
               color_discrete_sequence=["gray","green","red"],
               title="Customer Segmentation by RFM Analysis"
               )
fig.show()

In [153]:
! pip install pyngrok

Collecting pyngrok
  Downloading pyngrok-7.2.2-py3-none-any.whl.metadata (8.4 kB)
Downloading pyngrok-7.2.2-py3-none-any.whl (22 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.2


In [319]:
from pyngrok import ngrok

NGROK_AUTH_TOKEN="27peAqIGJrYB0ZqjiR28bPyVIwg_3zQrJDftdDDWG17u8TUkC"
ngrok.set_auth_token(NGROK_AUTH_TOKEN)

In [320]:
ngrok.connect(8501)



<NgrokTunnel: "https://70d1-34-169-24-114.ngrok-free.app" -> "http://localhost:8501">

In [321]:
%%writefile dashboard.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,year,month,quarter,udf,lit,countDistinct,count,date_format,collect_set,current_date,max,day,expr,sum,ntile,concat
from pyspark.sql.types import *
from pyspark.sql.window import Window
import plotly.express as px
import streamlit as st

spark=SparkSession.builder.getOrCreate()

sales_schema=StructType([ StructField("S_Product_id",IntegerType(),False),
                          StructField("Customer_id",StringType(),False),
                          StructField("Order_date",DateType(),True),
                          StructField("Location",StringType(),True),
                          StructField("Source_order",StringType(),True)
                   ])

menu_schema=StructType([ StructField("M_Product_id",IntegerType(),False),
                          StructField("Product_name",StringType(),False),
                          StructField("Price",FloatType(),True),
                   ])

sales_df=spark.read.format("csv").schema(sales_schema).load(path="/content/sales.csv.txt",inferSchema=True)
menu_df =spark.read.format("csv").schema(menu_schema ).load(path="/content/menu.csv.txt" ,inferSchema=True)

df=sales_df.join(menu_df,on=sales_df.S_Product_id==menu_df.M_Product_id,how="inner").drop("M_product_id")

df=df.withColumns( {"Year":year("Order_date"),"Month":month("Order_date"), "Quarter":quarter("Order_date") })
df=df.dropDuplicates()

##############################################################################

st.set_page_config(layout="wide")

html_temp = """
            <div style="background-color:tomato;padding:10px">
            <h1 style="color:white;text-align:center"> Sales Analysis.</h1>
            </div>
                   """
st.markdown(html_temp,unsafe_allow_html=True)


col1,col2=st.columns(2)
with col1:

          x=df.groupBy("Source_order").sum("Price").withColumnRenamed("sum(Price)","Total Sales")
          fig=px.pie(data_frame=x,
                    names="Source_order",
                    values="Total Sales",
                    color="Source_order",
                    color_discrete_sequence=["red","green","blue"],
                    title="Total Sales for each Source"
          )
          fig.update_traces(textinfo="label+percent",textposition="outside")
          st.plotly_chart(fig,use_container_width=False)

          x=df.groupBy("Product_name").sum("Price").withColumnRenamed("sum(Price)","Total Sales").sort(col("Total Sales").desc())
          fig=px.bar(data_frame=x,
                     x="Total Sales",
                     y="Product_name",
                     color="Product_name",
                     color_discrete_sequence=["red","blue","green","yellow","gray","orange"],
                     text="Total Sales",
                     title="Total Sales for each Product",
                     orientation="h"
           )
          st.plotly_chart(fig,use_container_width=True)

          x=df.groupBy("Customer_id").sum("Price").withColumnRenamed("sum(Price)","Total Payment")
          fig=px.bar(data_frame=x,
                    x="Total Payment",
                    y="Customer_id",
                    color="Customer_id",
                    color_discrete_sequence=["red","gray","blue","green","yellow"],
                    text="Total Payment",
                    title="Total Payment for each Customer",
                    orientation="h",
                    category_orders={"Customer_id":["A","B","C","D","E"]}
           )
          st.plotly_chart(fig,use_container_width=True)


          x=df.groupBy("Location").sum("Price").withColumnRenamed("sum(Price)","Total Sales")
          fig=px.bar(data_frame=x,
                     x="Location",
                     y="Total Sales",
                     color="Location",
                     color_discrete_sequence=["orange","red","blue"],
                     text="Total Sales",
                     title="Total Sales for each Country",
                     labels={"Location":"Country"}
           )
          st.plotly_chart(fig,use_container_width=True)


with col2:

          x=df.groupBy("Source_order","Year").agg(countDistinct("Order_date").alias("count")).orderBy("count").withColumn("Year",col("Year").cast("string"))
          fig=px.bar(data_frame=x,
                      x="count",
                      y="Source_order",
                      color="Year",
                      color_discrete_sequence=["blue","red"],
                      text="count",
                      title="Most Visited Source Order",
                      orientation="h",
                      barmode="relative"
           )
          st.plotly_chart(fig,use_container_width=True)

          x=df.groupBy("Product_name","Year").count().orderBy("count").withColumn("Year",col("Year").cast("string"))
          fig=px.bar(data_frame=x,
                    x="count",
                    y="Product_name",
                    color="Year",
                    color_discrete_sequence=["red","blue"],
                    text="count",
                    title="Best Selling Product",
                    orientation="h",
                    barmode="relative"
           )
          st.plotly_chart(fig,use_container_width=True)

          x=df.groupBy("Customer_id").agg(countDistinct("Order_date").alias("Number of Visits"),count("Customer_id").alias("Number of Orders")).sort("Customer_id")
          fig=px.area(data_frame=x,
                    x="Customer_id",
                    y=["Number of Visits","Number of Orders"],
                    color_discrete_sequence=["red","green"],
                    title="Comparison between Customer Visits to their Orders"
            )
          st.plotly_chart(fig,use_container_width=True)

          x=df.groupBy("Location").count()
          fig=px.bar(data_frame=x,
                    x="Location",
                    y="count",
                    color="Location",
                    color_discrete_sequence=["orange","red","blue"],
                    text="count",
                    title="No of Orders in each Country",
                    labels={"Location":"Country"})
          st.plotly_chart(fig,use_container_width=True)


c1,c2,c3=st.columns(3)

with c1:
          x=df.groupBy("Year").sum("Price").withColumnRenamed("sum(Price)","Total Sales").withColumn("Year",col("Year").cast("string"))
          fig=px.bar(data_frame=x,
                    x="Year",
                    y="Total Sales",
                    color="Year",
                    text="Total Sales",
                    title="Yearly Sales",
                    category_orders={"Year":["2022","2023"]},
                    color_discrete_sequence=["blue","red"]
           )
          st.plotly_chart(fig,use_container_width=True)


with c2:
          x=df.groupBy("Quarter").sum("Price").withColumnRenamed("sum(Price)","Total Sales").withColumn("Quarter",col("Quarter").cast("string"))
          fig=px.bar(data_frame=x,
                    x="Quarter",
                    y="Total Sales",
                    color="Quarter",
                    color_discrete_sequence=["blue","red","gray","green"],
                    text="Total Sales",
                    category_orders={"Quarter":["1","2","3","4"]},
                    title="Quarterly Sales"
                      )
          st.plotly_chart(fig,use_container_width=True)

with c3:
          x=df.withColumn("Month_name", date_format( col("order_date") ,"MMMM")).groupBy("Month","Month_name").sum("Price").withColumnRenamed("sum(Price)","Total Sales").sort(col("Month").asc())#.withColumn("Month",col("Month").cast("string"))
          fig=px.bar(data_frame=x,
                    x="Month",
                    y="Total Sales",
                    text="Total Sales",
                    hover_name="Month_name",
                    title="Monthly Sales",
                    color_discrete_sequence=["red"]
                      )
          st.plotly_chart(fig,use_container_width=True)

current_date="2024-1-1"
RFM=df.groupBy("Customer_id").agg(     ( current_date-max("Order_date")  ).alias("Recency") ,
                                                    count("Customer_id").alias("Frequency") ,
                                                             sum("Price").alias("Monetory")
                                 ).sort("Customer_id").withColumn("Recency",col("Recency").cast("integer"))

RFM=RFM.withColumns({ "Recency_Score":   ntile(3).over( Window.orderBy( col("Recency").desc() ) ),
                      "Frequency_Score": ntile(3).over( Window.orderBy( col("Frequency").asc() ) ),
                      "Monetory_Score":  ntile(3).over( Window.orderBy( col("Monetory").asc() ) ),
                 })
RFM=RFM.withColumn( "RFM_Score",  col("Recency_Score")+col("Frequency_Score")+col("Monetory_Score") )
@udf(returnType=StringType())
def segment(rfm):
    if rfm < 5:
      return "Hibernating Customer"
    elif rfm < 7:
      return "Need Attention"
    else:
      return "VIP Customer"

RFM=RFM.withColumn("Segment", segment(col("RFM_Score")))
fig=px.treemap(data_frame=RFM,
               path=["Segment","Customer_id"],
               color="Segment",
               color_discrete_sequence=["blue","green","red"],
               title="Customer Segmentation by RFM Analysis"
               )
st.plotly_chart(fig,use_container_width=True)

Overwriting dashboard.py


In [322]:
!streamlit run dashboard.py


Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://34.169.24.114:8501[0m
[0m
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/01 18:43:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/01 18:43:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/01/01 18:43:45 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/01/01 18:44:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/01 18:44:15 WARN Wi