In [1]:
from package_template.extract import spark_read
from package_template.load import spark_write
from package_template.preprocessing import ShoppingListTransformer
from package_template.models import AssociationRuleModel
from package_template.postprocessing import ScoreNormalization
from pyspark.ml import Pipeline

import datetime

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test_pipeline").\
config("spark.ui.port","4050").\
config("spark.driver.memory","15g").\
config("spark.driver.storeageFraction","0.3").\
getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/13 08:01:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Data Loading and preprocessing

In [3]:
df = spark_read(
    spark=spark,
    file_path="../../tests/data/test_spark_load_parquet",
    file_format="parquet")
df.show()

                                                                                

+--------+-------+
|trans_id|item_id|
+--------+-------+
|       3|      A|
|       3|      C|
|       6|      A|
|       6|      B|
|       1|      A|
|       5|      B|
|       4|      D|
|       4|      C|
|       1|      B|
|       2|      B|
|       4|      E|
|       2|      A|
|       5|      A|
|       1|      C|
+--------+-------+



# Use Association Rule model to make recommendations

## build model

In [6]:
transformer = ShoppingListTransformer(
             transaction_col = "trans_id",
             item_col = "item_id",
             output_col = "shopping_list",
             )
model = AssociationRuleModel(
                item_list_col = "shopping_list",
                min_support = 0.2,
                min_confidence = 0.2,
                min_lift=0.5,
                output_format="dataframe",
                provide_score=True,
                candidate_count=3,
            )
score_transformation = ScoreNormalization(
         group_col="item_id",
         rec_col="recommendations",
         score_col="score",
         output_format="dataframe",
         provide_score=True,
         transformation_method='log'
     )
pipeline = Pipeline(stages=[transformer, model, score_transformation]).fit(df)

## test model result

In [7]:
# predict for all
result = pipeline.transform(df)
result.show()

+-------+---------------+----------+
|item_id|recommendations|     score|
+-------+---------------+----------+
|      A|         [B, C]|[1.0, 0.0]|
|      B|            [A]|     [1.0]|
|      C|            [A]|     [0.0]|
+-------+---------------+----------+



## Save result

In [8]:
spark_write(
            data=result,
            file_path="../../tests/data/test_spark_write_parquet_result",
            file_format="parquet",
        )