In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *

In [0]:
%run "/Users/koladevirevanth@gmail.com/Apple_Data_Analysis/Extract"


In [0]:
%run "/Users/koladevirevanth@gmail.com/Apple_Data_Analysis/spark_session"


In [0]:
spark_session = SparkSession().create_spark_session()

Spark Session Initiated


In [0]:

fetcher_customer = Extractor(filepath='/FileStore/Customer_Updated.csv',spark_session = spark_session)
fetcher_products = Extractor(filepath='/FileStore/Products_Updated.csv',spark_session = spark_session)
fetcher_transaction = Extractor(filepath='/FileStore/Transaction_Updated-1.csv',spark_session = spark_session)
customer_df = fetcher_customer.data_extractor()
products_df = fetcher_products.data_extractor()
transaction_df = fetcher_transaction.data_extractor()

Extactor Object Initiated
Extactor Object Initiated
Extactor Object Initiated
data_extractor initiated for /FileStore/Customer_Updated.csv
data_extractor initiated for /FileStore/Products_Updated.csv
data_extractor initiated for /FileStore/Transaction_Updated-1.csv


Business Transformation

In [0]:
from pyspark.sql.window import Window

class transform:
    def __init__(self, customers, products, transactions):
        self.customers: pyspark.sql.DataFrame = customers
        self.products: pyspark.sql.DataFrame = products
        self.transactions: pyspark.sql.DataFrame = transactions
    
    def sold_products(self):
        try:
            print("Performing Join to get Products that have been Sold")
            return self.products.join(self.transactions,'product_id','inner')
        except Exception as e:
            print(e)

    def purchase_per_customer(self):
        try:
            print("Fetching per customer purchase")
            transacted_customer = self.customers.join(self.transactions, 'customer_id', 'inner') \
                                                .join(self.products, 'product_id', 'inner') \
                                                .select(
                                                    col('customer_id'),
                                                    col('customer_name'),
                                                    col('price')
                                                ) \
                                                .groupBy(col('customer_id'),col('customer_name')) \
                                                .agg(sum(col('price')).alias('Total')) \
                                                    .select(
                                                        col('customer_name'),
                                                        col('Total') 
                                                    ).orderBy(col('Total').desc())
            rank_window = Window.orderBy(col('Total').desc())
            ranked_customers = transacted_customer.withColumn('Rank', rank().over(rank_window))
            ranked_customers.display()            
        except Exception as e:
            print(e)

    def product_selling_ranks(self):
        try:
            print("Fetching Product ranks")
            sold_products = self.sold_products() . \
                                    groupBy(col('product_id'),self.products.product_name) \
                                        .agg(sum(col('price')).alias('Total_Sales')) \
                                            .select(
                                                col('product_name'),
                                                col('Total_Sales')
                                            )
            rank_window = Window.orderBy(col('Total_Sales').desc())
            products_rank = sold_products.withColumn('Rank',rank().over(rank_window))
            products_rank.display()
        except Exception as e:
            print(e)
    
    def airpods_post_other_purchases(self):
        try:
            print("Fetching if airpods purchased after purchasing any other products")
            sold_products = self.sold_products()
            window_func = Window.partitionBy(col('customer_id')).orderBy('transaction_id')
            lead_products = sold_products.withColumn("next_purchase",lead(self.transactions.product_name).over(window_func)).where((col('next_purchase') != 'null') &  (col('next_purchase') =='AirPods')) \
                .select(
                col('customer_id'),
                self.transactions.product_name.alias("first_purchase"),
                col('next_purchase')
            )
            return lead_products
        except Exception as e:
            print(e)


transformer = transform(customers=customer_df, products=products_df, transactions=transaction_df)


In [0]:
transformer.product_selling_ranks()

Fetching Product ranks
Performing Join to get Products that have been Sold


product_name,Total_Sales,Rank
MacBook Air,2000.0,1
iPhone SE,1800.0,2
AirPods Pro,1000.0,3


Databricks visualization. Run in Databricks to view.

In [0]:
transformer.purchase_per_customer()

Fetching per customer purchase


customer_name,Total,Rank
Frank,1700.0,1
Eva,1700.0,1
Henry,700.0,3
Grace,700.0,3


Databricks visualization. Run in Databricks to view.

In [0]:
products = transformer.airpods_post_other_purchases()
products.groupBy('first_purchase').agg(count("next_purchase").alias("next_purchase_airpods")).display()

Fetching if airpods purchased after purchasing any other products
Performing Join to get Products that have been Sold


first_purchase,next_purchase_airpods
MacBook,1
iPhone,2


Databricks visualization. Run in Databricks to view.