In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load("/Volumes/apple_analysis/default/apple_record_csv/Customer_Updated.csv")
df.show()
df.printSchema()

+-----------+-------------+----------+--------+
|customer_id|customer_name| join_date|location|
+-----------+-------------+----------+--------+
|        105|          Eva|2022-01-01|    Ohio|
|        106|        Frank|2022-02-01|  Nevada|
|        107|        Grace|2022-03-01|Colorado|
|        108|        Henry|2022-04-01|    Utah|
+-----------+-------------+----------+--------+

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- join_date: date (nullable = true)
 |-- location: string (nullable = true)



In [0]:
%run "/Workspace/apple_record/dataReader"


In [0]:
class WorkFlow:

    def __init__(self):
        pass

    def runner(self): 
        transcationInputDf = get_data_source(
            data_type="csv",file_path="/Volumes/apple_analysis/default/apple_record_csv/Transaction_Updated.csv",).get_data_frame()

        # transactionInputDf = transcationInputDf.withColumnRenamed("customer_id", "customer_id_transaction")

        transcationInputDf.orderBy('customer_id', 'transaction_id').show()
        transcationInputDf.printSchema()
       

workflow = WorkFlow()
workflow.runner()
# DBTITLE 1
    

+--------------+-----------+------------+----------------+
|transaction_id|customer_id|product_name|transaction_date|
+--------------+-----------+------------+----------------+
|            11|        105|      iPhone|      2022-02-01|
|            14|        105|     AirPods|      2022-02-04|
|            18|        105|     MacBook|      2022-02-08|
|            12|        106|      iPhone|      2022-02-02|
|            16|        106|     MacBook|      2022-02-06|
|            20|        106|     AirPods|      2022-02-10|
|            13|        107|     AirPods|      2022-02-03|
|            17|        107|      iPhone|      2022-02-07|
|            15|        108|      iPhone|      2022-02-05|
|            19|        108|     AirPods|      2022-02-09|
+--------------+-----------+------------+----------------+

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- transaction_date: string (null

In [0]:
%run "/Workspace/apple_record/dataTransform"

In [0]:
class FirstTransform: 
    """Customers who have bought Airpods after buying the iPhone"""
    def __init__(self):
        pass

    def dataTransform(self):
        transcationInputDf = get_data_source(
            data_type="csv",file_path="/Volumes/apple_analysis/default/apple_record_csv/Transaction_Updated.csv",).get_data_frame()

        inputDfs = {
            "transcationInputDf": transcationInputDf
        }
        transcationInputDf.printSchema()
        firsttransformation = AirpodsAfterIphoneTransformer().transform(
            inputDfs
            )
        # appleAirpodsAfterIphoneTransformer.show()
        # firstTransformDf.printSchema()

firstTransform = FirstTransform().dataTransform()


root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- transaction_date: string (nullable = true)

{'transcationInputDf': DataFrame[transaction_id: string, customer_id: string, product_name: string, transaction_date: string]}
transcationInputDF in transform data
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- transaction_date: string (nullable = true)

+--------------+-----------+------------+----------------+
|transaction_id|customer_id|product_name|transaction_date|
+--------------+-----------+------------+----------------+
|            11|        105|      iPhone|      2022-02-01|
|            12|        106|      iPhone|      2022-02-02|
|            13|        107|     AirPods|      2022-02-03|
|            14|        105|     AirPods|      2022-02-04|
|            15|        108|      iPhone|      20

In [0]:
%sql 
select * from apple_analysis.default.customer_updated

customer_id,customer_name,join_date,location
105,Eva,2022-01-01,Ohio
106,Frank,2022-02-01,Nevada
107,Grace,2022-03-01,Colorado
108,Henry,2022-04-01,Utah


In [0]:
class SecondTransform: 
    """Customers who have bought Airpods after buying the iPhone"""
    def __init__(self):
        pass

    def dataTransform(self):
        transcationInputDf = get_data_source(
            data_type="csv",file_path="/Volumes/apple_analysis/default/apple_record_csv/Transaction_Updated.csv",).get_data_frame()
        
        customerInputDf = get_data_source(
            data_type="delta",file_path="apple_analysis.default.customer_updated").get_data_frame()
        
        inputDfs = {
            "transcationInputDf": transcationInputDf,
            "customerInputDf": customerInputDf
        }
        # transcationInputDf.printSchema()
        secondTransform = AirpodsAfterIphoneTransformer().transformWithCustomerTable(
            inputDfs
            )

secondTransform = SecondTransform().dataTransform()

transcationInputDF in transform data
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- transaction_date: string (nullable = true)

+--------------+-----------+------------+----------------+
|transaction_id|customer_id|product_name|transaction_date|
+--------------+-----------+------------+----------------+
|            11|        105|      iPhone|      2022-02-01|
|            12|        106|      iPhone|      2022-02-02|
|            13|        107|     AirPods|      2022-02-03|
|            14|        105|     AirPods|      2022-02-04|
|            15|        108|      iPhone|      2022-02-05|
|            16|        106|     MacBook|      2022-02-06|
|            17|        107|      iPhone|      2022-02-07|
|            18|        105|     MacBook|      2022-02-08|
|            19|        108|     AirPods|      2022-02-09|
|            20|        106|     AirPods|      2022-02-10|
+--------

In [0]:
%run "/Workspace/apple_record/dataExtractor"

In [0]:
%run "/Workspace/apple_record/dataLoader"

In [0]:
class FirstWorkFlowDesign:
    """
    ETL pipeline to generate the data for all customers who have bought Airpods just after buying iPhone
    """ 
    def __init__(self):
        pass

    def workFlowRunner(self):

        # Step 1: Extract all required data from different source
        inputDFs = AirpodsAfterIphoneExtractor().extract()
        # Step 2: Implement the Transformation logic
        # Customers who have bought Airpods after buying the iPhone
        transformedDF = AirpodsAfterIphoneTransformer().transformWithCustomerTable(inputDFs)
        # Step 3: Load all required data to differnt sink
        AirPodsAfterIphoneLoader(transformedDF).sink()

firstTransform = FirstWorkFlowDesign().workFlowRunner()




transcationInputDF in transform data
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- transaction_date: string (nullable = true)

+--------------+-----------+------------+----------------+
|transaction_id|customer_id|product_name|transaction_date|
+--------------+-----------+------------+----------------+
|            11|        105|      iPhone|      2022-02-01|
|            12|        106|      iPhone|      2022-02-02|
|            13|        107|     AirPods|      2022-02-03|
|            14|        105|     AirPods|      2022-02-04|
|            15|        108|      iPhone|      2022-02-05|
|            16|        106|     MacBook|      2022-02-06|
|            17|        107|      iPhone|      2022-02-07|
|            18|        105|     MacBook|      2022-02-08|
|            19|        108|     AirPods|      2022-02-09|
|            20|        106|     AirPods|      2022-02-10|
+--------

In [0]:
df = spark.read.format("delta").load("/Volumes/apple_analysis/default/apple_record_csv/airpods_after_iphone")
df_ohio = df.filter(df.location == "Ohio")
df_ohio.show()

+-----------+-------------+----------+--------+
|customer_id|customer_name| join_date|location|
+-----------+-------------+----------+--------+
|        105|          Eva|2022-01-01|    Ohio|
+-----------+-------------+----------+--------+



In [0]:
%sql
SHOW VOLUMES IN apple_analysis.default;
-- CREATE VOLUME apple_analysis.default.airpods_after_iphone;


database,volume_name
default,apple_record_csv


In [0]:
%sql 
select * from apple_analysis.default.airpods_after_iphone

customer_id,customer_name,join_date,location
105,Eva,2022-01-01,Ohio
108,Henry,2022-04-01,Utah


In [0]:
display(dbutils.fs.ls("dbfs:/"))


path,name,size,modificationTime
dbfs:/Volumes/,Volumes/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0


In [0]:
class SecondWorkFlowDesign:
    """
    ETL pipeline to generate the data for all customers who have bought only iPhone and Customers
    """ 
    def __init__(self):
        pass

    def workFlowRunner(self):

        # Step 1: Extract all required data from different source
        inputDFs = AirpodsAfterIphoneExtractor().extract()

        # Step 2: Implement the Transformation logic
        # Customers who have bought Airpods after buying the iPhone
        onlyAirpodsAndIphoneDF = OnlyAirpodsAndIphone().transform(inputDFs)

        # Step 3: Load all required data to differnt sink
        OnlyAirpodsAndIPhoneLoader(onlyAirpodsAndIphoneDF).sink()

secondTransform = SecondWorkFlowDesign().workFlowRunner()



transcationInputDf in transform data
+--------------+-----------+------------+----------------+
|transaction_id|customer_id|product_name|transaction_date|
+--------------+-----------+------------+----------------+
|            11|        105|      iPhone|      2022-02-01|
|            12|        106|      iPhone|      2022-02-02|
|            13|        107|     AirPods|      2022-02-03|
|            14|        105|     AirPods|      2022-02-04|
|            15|        108|      iPhone|      2022-02-05|
|            16|        106|     MacBook|      2022-02-06|
|            17|        107|      iPhone|      2022-02-07|
|            18|        105|     MacBook|      2022-02-08|
|            19|        108|     AirPods|      2022-02-09|
|            20|        106|     AirPods|      2022-02-10|
+--------------+-----------+------------+----------------+

Grouped DF
+-----------+--------------------+
|customer_id|            products|
+-----------+--------------------+
|        108|   [iPh

In [0]:
class WorkFlowRunner:

    def __init__(self, name):
        self.name = name

    def runner(self):
        if self.name == "firstWorkFlow":
            return FirstWorkFlowDesign().workFlowRunner()
        elif self.name == "secondWorkFlow":
            return SecondWorkFlowDesign().workFlowRunner()
        else:
            raise ValueError(f"Not Implemented for {self.name}")

name = "secondWorkFlow" 

workFlowrunner = WorkFlowRunner(name).runner()

