In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, types as T, functions as F
from collections import Counter
spark = SparkSession.builder.getOrCreate()

In [3]:
class Transactions:
    
    # Initialse object with json file as parameter
    def __init__(self,file):
        self.df = spark.read.json(file,multiLine=True)
        
    # This functions transforms the dataframe to columns: customer_id, product_id, purchase_count_per_product_id
    def transform_data(self):  
        
        # create a map with product_id as key and count as value with udf
        func = F.udf(lambda x: dict(Counter(x)), T.MapType(T.StringType(), T.IntegerType()))
        self.df = self.df.withColumn('Product_Map',func(self.df.basket['product_id']))
        
        # Explode the keys of map into each new rows and select required columns
        self.df = self.df.select('customer_id',F.explode(self.df['Product_Map']))
        
        # Name the columns 
        self.df = self.df.withColumnRenamed('key','product_id')
        self.df = self.df.withColumnRenamed('value','purchase_count_per_product_id')

In [4]:
class Customers:
    
    # Initialse object with csv file as parameter
    def __init__(self,file):
        self.df = spark.read.csv(file,header=True)
    
    # create map with customer_id as key and loyalty_score as value
    def create_map(self):
        self.loyalty_score_map = {row.customer_id : row.loyalty_score for row in self.df.collect()}

In [5]:
class Products:
    
    # Initialse object with csv file as parameter
    def __init__(self,file):
        self.df = spark.read.csv(file,header=True)
    
    # create map with product_id as key and product_category as value
    def create_map(self):
        self.product_map = {row.product_id : row.product_category for row in self.df.collect()}

## Unit testing using pyspark

In [6]:
## Function for unit testing using pytest

import pytest
import ipytest

def test_Transactions():
    trans = Transactions('C:/Users/PAVAN/Downloads/transactions.json')
    assert trans.df.columns == ['basket', 'customer_id', 'date_of_purchase']
    trans.transform_data()
    assert trans.df.columns == ['customer_id', 'product_id', 'purchase_count_per_product_id']
    
def test_Customers():
    cust = Customers('C:/Users/PAVAN/Downloads/customers.csv')
    assert cust.df.columns == ['customer_id', 'loyalty_score']
    cust.create_map()
    assert type(cust.loyalty_score_map) == dict

def test_Products():
    prod = Products('C:/Users/PAVAN/Downloads/products.csv')
    assert prod.df.columns == ['product_id', 'product_description', 'product_category']
    prod.create_map()
    assert type(prod.product_map) == dict

In [7]:
ipytest.run()

platform win32 -- Python 3.8.8, pytest-6.2.3, py-1.10.0, pluggy-0.13.1
rootdir: C:\Users\PAVAN
plugins: anyio-2.2.0
collected 3 items

tmpieylhwy2.py ...                                                                           [100%]

anaconda3\lib\site-packages\pyreadline\py3k_compat.py:8
    return isinstance(x, collections.Callable)



In [8]:
# Callling the class Objects and methods

trans = Transactions('C:/Users/PAVAN/Downloads/transactions.json')
trans.transform_data()
cust = Customers('C:/Users/PAVAN/Downloads/customers.csv')
cust.create_map()
prod = Products('C:/Users/PAVAN/Downloads/products.csv')
prod.create_map()

In [9]:
# printing the transactions data
trans.df.show()

+-----------+----------+-----------------------------+
|customer_id|product_id|purchase_count_per_product_id|
+-----------+----------+-----------------------------+
|         C1|        P1|                            2|
|         C1|        P3|                            1|
|         C2|        P1|                            1|
|         C2|        P2|                            1|
|         C3|        P1|                            1|
|         C3|        P2|                            1|
|         C4|        P1|                            1|
|         C4|        P2|                            1|
|         C5|        P1|                            1|
|         C5|        P2|                            1|
|         C6|        P1|                            1|
|         C6|        P2|                            1|
|         C7|        P1|                            1|
|         C7|        P2|                            1|
+-----------+----------+-----------------------------+



##  Creating final DataFrame

In [10]:
# create a global variable df for final output
df = trans.df

In [11]:
# Function to create final required table

def merge():
    global df
    df = df.withColumn('loyalty_score',df['customer_id'])
    df = df.replace(cust.loyalty_score_map, subset = 'loyalty_score')
    df = df.withColumn('product_category',df['product_id'])
    df = df.replace(prod.product_map, subset = 'product_category')
    df = df.select('customer_id','loyalty_score','product_id','product_category','purchase_count_per_product_id')

In [12]:
# calling the function merge

merge()

In [13]:
# this is the final required dataframe
df.show()

+-----------+-------------+----------+----------------+-----------------------------+
|customer_id|loyalty_score|product_id|product_category|purchase_count_per_product_id|
+-----------+-------------+----------+----------------+-----------------------------+
|         C1|           10|        P1|               1|                            2|
|         C1|           10|        P3|               3|                            1|
|         C2|          232|        P1|               1|                            1|
|         C2|          232|        P2|               2|                            1|
|         C3|           23|        P1|               1|                            1|
|         C3|           23|        P2|               2|                            1|
|         C4|           14|        P1|               1|                            1|
|         C4|           14|        P2|               2|                            1|
|         C5|           52|        P1|               1