# Transformer class notebook

In [0]:
#importing required libraries
from pyspark.sql.window import Window
from pyspark.sql.functions import lead,col,broadcast,collect_list,array_contains,size,rank,abs,datediff

In [0]:
# defining transform class
class Transform:
    def __init__(self):
        pass

    def transform(self):
        pass

#sub class for first transformation
class TransformerAirpodsAfterIphone(Transform):

    def transform(self,inputDFs):
        '''Customers who bought Airpods after buying IPhone'''

        transactionInputDF=inputDFs.get('transactionInputDF')

        windowSpec=Window.partitionBy('customer_id').orderBy('transaction_date')

        transformedDF=transactionInputDF.withColumn("next_product_name",lead("product_name").over(windowSpec))

        filteredDF=transformedDF.filter((transformedDF.product_name=='iPhone') & (transformedDF.next_product_name=='AirPods'))

        customerInputDF=inputDFs.get("customerInputDF")

        joinedDF=customerInputDF.join(broadcast(filteredDF),"customer_id") # using broadcast to improve performance

        joinedDF=joinedDF.select('customer_id','customer_name','location')

        return joinedDF




In [0]:
#sub class for second transformation
class TransformerAirpodsAndIphone(Transform):

    def transform(self,inputDFs):
        '''Customers who bought only Airpods and IPhone'''

        transactionInputDF=inputDFs.get('transactionInputDF')

        groupedDF=transactionInputDF.groupBy("customer_id").agg(collect_list("product_name").alias("products"))

        filteredDF=groupedDF.filter((array_contains(groupedDF.products,'AirPods'))
                                     & (array_contains(groupedDF.products,'iPhone'))
                                     & (size(groupedDF.products)==2))

        customerInputDF=inputDFs.get("customerInputDF")

        joinedDF=customerInputDF.join(broadcast(filteredDF),"customer_id")

        joinedDF=joinedDF.select('customer_id','customer_name','location')

        return joinedDF

In [0]:
#sub class for third transformation
class TransformerProductsAfterInitialPurchase(Transform):

    def transform(self,inputDFs):
        '''Products bought by customers after their first purchase'''

        transactionInputDF=inputDFs.get('transactionInputDF')

        windowSpec=Window.partitionBy('customer_id').orderBy('transaction_date')

        transformedDF=transactionInputDF.withColumn("rank",rank().over(windowSpec))

        filteredDF=transformedDF.filter(col('rank')!=1)

        groupedDF=filteredDF.groupBy("customer_id").agg(collect_list("product_name").alias("products_after_first_purchase"))

        customerInputDF=inputDFs.get("customerInputDF")

        joinedDF=customerInputDF.join(broadcast(groupedDF),"customer_id")

        joinedDF = joinedDF.select('customer_id','customer_name','products_after_first_purchase')

        return joinedDF



In [0]:
#sub class for fourth transformation

class TransformerAvgTimeDelay(Transform):

    def transform(self,inputDFs):
        '''Average time delay between buying Iphone and Airpods'''

        transactionInputDF_iphone=inputDFs.get('transactionInputDF').filter(col('product_name')=='iPhone').withColumnRenamed('transaction_date','transaction_date_iphone').select('customer_id','transaction_date_iphone')

        transactionInputDF_airpods=inputDFs.get('transactionInputDF').filter(col('product_name')=='AirPods').withColumnRenamed('transaction_date','transaction_date_airpods').select('customer_id','transaction_date_airpods')

        joinedDF = transactionInputDF_iphone.join(transactionInputDF_airpods,'customer_id')

        joinedDF = joinedDF.withColumn('days_between_purchases',abs(datediff(col('transaction_date_airpods'),col('transaction_date_iphone'))))
        
        finalDF = joinedDF.agg({'days_between_purchases':'avg'}).withColumnRenamed('avg(days_between_purchases)','avg_days_between_purchases')

        return finalDF
