In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
Df_Sales_No_Header= spark.read\
    .format('csv')\
    .option('inferSchema', 'true')\
    .load('/Volumes/workspace/pyspark_practise/pyspark_practise/sales.csv.txt')
Df_Sales_No_Header.display()

In [0]:
SalesHeaders = ["Product_id", "Customer_id", "Order_Date","Location","Source_Order"]

In [0]:
Df_Sales_With_Header = Df_Sales_No_Header.toDF(*SalesHeaders)\
    .withColumn('Order_Date', to_date(col('Order_Date'), 'MM/dd/yyyy'))
Df_Sales_With_Header.display()

In [0]:
Df_Customer = Df_Sales_With_Header\
    .withColumn('Order_Year', year(col('Order_Date')))\
    .withColumn('Order_Quarter_num', quarter(col('Order_Date')))\
    .withColumn('Order_Quarter', when(col('Order_Quarter_num')==1,'Q1')
            .when(col('Order_Quarter_num')==2,'Q2')\
            .when(col('Order_Quarter_num')==3,'Q3')\
            .when(col('Order_Quarter_num')==4,'Q4').otherwise('Unknown'))\
    .withColumn('Order_Month', month(col('Order_Date')))\
    .select('Product_id','Customer_id','Order_Date','Order_Year',\
    'Order_Quarter','Order_Month','Location','Source_Order')\
    .display()

In [0]:
Schema_Menu = StructType([
    StructField("Product_id", IntegerType(), True),
    StructField("Product_Name", StringType(), True),
    StructField("Price", DoubleType(), True)   
])

In [0]:
Df_Product_Data = spark.read\
    .format('csv')\
    .schema(Schema_Menu)\
    .load('/Volumes/workspace/pyspark_practise/pyspark_practise/menu.csv.txt')

display(Df_Product_Data)

In [0]:
df_read_from_file= spark.read\
    .format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('/Volumes/workspace/pyspark_practise/pyspark_practise/BigMartSales.csv')
display(df_read_from_file)

    

In [0]:
df_read_from_file.write.mode('overwrite').saveAsTable('workspace.pyspark_practise.BigSales')


### Customer Data

In [0]:
Df_Customer_Data = Df_Sales_With_Header\
    .withColumn('Order_Year', year(col('Order_Date')))\
    .withColumn('Order_Quarter_num', quarter(col('Order_Date')))\
    .withColumn('Order_Quarter', when(col('Order_Quarter_num')==1,'Q1')
            .when(col('Order_Quarter_num')==2,'Q2')\
            .when(col('Order_Quarter_num')==3,'Q3')\
            .when(col('Order_Quarter_num')==4,'Q4').otherwise('Unknown'))\
    .withColumn('Order_Month', month(col('Order_Date')))\
    .select('Product_id','Customer_id','Order_Date','Order_Year',\
    'Order_Quarter','Order_Month','Location','Source_Order')
Df_Customer_Data.display()
    

### Product Data

In [0]:
Df_Product_Data = spark.read\
    .format('csv')\
    .schema(Schema_Menu)\
    .load('/Volumes/workspace/pyspark_practise/pyspark_practise/menu.csv.txt')

display(Df_Product_Data)

### Total Amount Spent Customer on each Food Category

In [0]:
Total_Amount_Spent_By_Each_Customer_on_Food_Cateogry = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy('Product_Name')\
    .agg(sum(Df_Product_Data.Price).alias('Total_Amount_Spent_Product'))\
    .orderBy("Product_Name")
Total_Amount_Spent_By_Each_Customer_on_Food_Cateogry.show()

> ### Total Amount Spent By Each Customer

In [0]:
Total_Amount_Spent_By_Each_Customer = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy('Customer_id')\
    .agg(sum(Df_Product_Data.Price).alias('Total_Amount_Spent_Customer'))
Total_Amount_Spent_By_Each_Customer.show()

### Total Amount of Sales Each Month

In [0]:
Total_Amount_Spent_Each_Month = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy('Order_Month')\
    .agg(sum(Df_Product_Data.Price).alias('Monthly_Amount_Spent'))\
    .orderBy('Order_Month')
Total_Amount_Spent_Each_Month.show()

- ### Total Amount of Sales Each Year

In [0]:
Total_Amount_Spent_Each_Year = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy('Order_Year')\
    .agg(sum(Df_Product_Data.Price).alias('Yearly_Amount_Spent'))\
    .orderBy('Order_Year')
Total_Amount_Spent_Each_Year.show()

### Total Item Sales Count

In [0]:
Total_Count_Of_Each_Product_Sales = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy(Df_Customer_Data.Product_id,Df_Product_Data.Product_Name)\
    .agg(count(Df_Product_Data.Product_Name).alias('Product_Sales_Count'))\
    .orderBy('Product_Sales_Count',ascending=0)
Total_Count_Of_Each_Product_Sales.show()

In [0]:
Top_Product_Sales = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy(Df_Customer_Data.Product_id,Df_Product_Data.Product_Name)\
    .agg(count(Df_Product_Data.Product_Name).alias('Product_Sales_Count'))\
    .drop(Df_Customer_Data.Product_id)\
    .orderBy('Product_Sales_Count',ascending=0)\
    .limit(1)
Top_Product_Sales.show()

 # ### Customer Frequent Visit to Resturant

In [0]:
Customer_Frequent_Visits = \
Df_Customer_Data\
    .filter(col('Source_Order')=='Restaurant')\
    .groupBy('Customer_id')\
    .agg(countDistinct('Order_Date').alias('Total_Visits'))\
    .orderBy('Total_Visits') 
Customer_Frequent_Visits.show()


### Total_Sales_By_Each_Country

In [0]:
Total_Amount_Spent_By_Each_Country = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy('Location')\
    .agg(sum(Df_Product_Data.Price).alias('Total_Amount_Spent_Country'))\
    .orderBy("Total_Amount_Spent_Country",ascending=0)
Total_Amount_Spent_By_Each_Country.show()

In [0]:
Total_Amount_Spent_By_Each_Country = \
Df_Customer_Data.join(Df_Product_Data, Df_Customer_Data.Product_id == Df_Product_Data.Product_id, 'inner')\
    .groupBy('Location')\
    .agg(sum(Df_Product_Data.Price).alias('Total_Amount_Spent_Country'))\
    .orderBy("Total_Amount_Spent_Country",ascending=0)
Total_Amount_Spent_By_Each_Country.show()