# 01. Setup & Exploration : 

- This Section ingredients to,
  - Importing Necessary Libraries
  - Building a Spark Sessions 
  - Import to the dataset from the schema
  - Exploring to the schema of each tables.

### Importing Necessary Libraries : 

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, concat,regexp_replace, col,broadcast, sum, countDistinct

### Building A Spark Session

In [0]:
spark = SparkSession.builder.appName('MySparkProject_1').getOrCreate()

### Import the Datasets from the Schema

In [0]:
df_transactions = spark.table("samples.bakehouse.sales_transactions")
df_suppliers = spark.table("samples.bakehouse.sales_suppliers")
df_fr = spark.table("samples.bakehouse.sales_franchises")
df_customers = spark.table("samples.bakehouse.sales_customers")
df_rev = spark.table("samples.bakehouse.media_customer_reviews")
df_golds = spark.table("samples.bakehouse.media_gold_reviews_chunked")

### Showing the schema and first 5 rows of the each Datasets

In [0]:
df_transactions.printSchema()
df_suppliers.printSchema()
df_fr.printSchema()
df_customers.printSchema()
df_rev.printSchema()
df_golds.printSchema()

In [0]:
df_transactions.show(5)
df_suppliers.show(5)
df_fr.show(5)
df_customers.show(5)
df_rev.show(5)
df_golds.show(5)

# 02. Cleaning & Preparation

- This sections ingredients to,
  - Management of Missing value(Filling & other ops.)
  - Changing datatypes    

In [0]:
(df_transactions.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in df_transactions.columns]).show())
(df_suppliers.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in df_suppliers.columns]).show())

(df_fr.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in df_fr.columns]).show())
(df_customers.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in df_customers.columns]).show())
(df_rev.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in df_rev.columns]).show())
(df_golds.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in df_golds.columns]).show())


- There isn't a null value of the any tables. So, I won't do any na.fill() operations. 
- Some tables ingredient some datetime values, we can change it's dtypes

In [0]:
df_customers = df_customers.withColumn('customer_full_name', concat(df_customers["first_name"], lit(' '), df_customers["last_name"]))
df_customers = df_customers.withColumn('postal_zip_code', df_customers.postal_zip_code.cast('string'))

df_customers = df_customers.withColumn('customerid', df_customers.customerID.cast('string'))
df_transactions = df_transactions.withColumn('customerid', df_transactions.customerID.cast('string'))
df_transactions = df_transactions.withColumn('transactionID', df_transactions.transactionID.cast('string'))
df_transactions = df_transactions.withColumn('franchiseID', df_transactions.franchiseID.cast('string'))

# Joining & Enrichment

- This sections ingredients to,
  - Building to the Main Analysis Table using with Join.
  - Using to the Brodcast() method for optimization 

In [0]:
df_transactions_fixed = df_transactions.withColumn("join_key_id",regexp_replace(df_transactions["CustomerID"], "^1", "2"))

In [0]:
df_tr_fr = df_transactions_fixed.join(broadcast(df_fr),["franchiseID"],"left_outer")

In [0]:
df_main_intermediate = df_tr_fr.join(df_customers, df_tr_fr.join_key_id == df_customers.customerid, 'left_outer')

In [0]:
df_main_fixed = df_main_intermediate.drop(df_tr_fr.customerid)

In [0]:
df_main_fixed.createOrReplaceTempView("sales_data_view")

In [0]:
df_media =  df_rev.join(broadcast(df_golds),['franchiseID'],'left_outer')

# DataFrame API

- This section ingredients to,
  - Build some aggregation process based on Customer, Product , franchises etc.

In [0]:
df_main = df_main_fixed.withColumn("totalPrice", df_main_fixed["totalPrice"].cast("double")) \
                 .withColumn("quantity", col("quantity").cast("integer"))

In [0]:
df_main.groupBy('customer_full_name') \
                .agg(
                    sum('totalPrice').alias('total_amount'),
                    sum('quantity').alias('total_items'),
                    countDistinct('transactionID').alias('total_transactions')  
                ) \
                .orderBy('total_amount', ascending=False).show(10)

In [0]:
df_main.groupBy('product') \
                .agg(
                    sum('totalPrice').alias('total_amount'),
                    sum('quantity').alias('total_items'),
                    countDistinct('transactionID').alias('total_transactions')  
                ) \
                .orderBy('total_amount', ascending=False).show(10)

In [0]:
df_main.groupBy('paymentMethod') \
                .agg(
                        sum('totalPrice').alias('total_amount'),
                        sum('quantity').alias('total_items'),
                        countDistinct('transactionID').alias('total_transactions')
                )\
                .orderBy('total_amount', ascending=False).show(10)

In [0]:
df_main.groupBy('franchiseID') \
                .agg(
                        sum('totalPrice').alias('total_amount'),
                        sum('quantity').alias('total_items'),
                        countDistinct('transactionID').alias('total_transactions')
                )\
                .orderBy('total_amount', ascending=False).show(10)

# Spark SQL

- This Section ingredients to, 
  - SparkSQL Methods

In [0]:
df_main.createOrReplaceTempView("sales_data_view")
display(df_main)

In [0]:
exp_products = spark.sql("""SELECT product, 
                                SUM(quantity) quantity,
                                SUM(totalPrice) totalAmount,
                                COUNT(DISTINCT(transactionID)) totalTransactions
                                
                                FROM sales_data_view 
                                GROUP BY product""")
exp_products.show()

In [0]:
ch_month_fr = spark.sql("""
                SELECT
                    MONTH(dateTime) Ay,
                    franchiseID, 
                    SUM(totalPrice) totalAmount,
                    COUNT(DISTINCT transactionID) totalTransactions,
                    COUNT(DISTINCT customerid) totalCustomerCount  

                FROM sales_data_view 
                GROUP BY 1,2 
                ORDER BY 1,3 DESC

          
          """)

ch_month_fr.show(10)         

In [0]:
ch_custs = spark.sql("""
                        SELECT 
                            customer_full_name,
                            SUM(totalPrice) totalAmount,
                            COUNT(DISTINCT transactionID) totalTransactions

                        FROM sales_data_view 
                        GROUP BY 1
                        ORDER BY 2 DESC
                        LIMIT 10
                     """)

ch_custs.show()                  

# Optimization & Results

In [0]:
ch_custs.explain()