In [None]:
pip install pyspark




In [None]:
pip install numpy




In [None]:
pip install setuptools



In [None]:
pip install pandas



In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.fpm import FPGrowth
from datetime import timedelta
from itertools import chain, combinations
from pyspark.sql.functions import concat_ws

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("RecommendationSystem") \
    .enableHiveSupport() \
    .getOrCreate()

spark.conf.set("fs.s3a.acl.default", "BucketOwnerFullControl")
spark.conf.set("fs.s3.canned.acl", "BucketOwnerFullControl")
spark.conf.set("fs.s3.acl.default", "BucketOwnerFullControl")
spark.conf.set("fs.s3a.acl", "bucket-owner-full-control")
spark.conf.set("spark.sql.session.timeZone", "UTC+05:00")

# Destination table paths
dst_table = "recommendations/"
dst_table_csv = "recommendations_csv/recommendations_national_"


def days_finder(df_order_temp, n_days, max_similar_items, min_items_confidence, pattern_proportion):
    df_order_temp = df_order_temp.filter(F.col('created_at') >= F.date_sub(F.current_date(), n_days))
    order_days_count = df_order_temp.count()
    distinct_items = df_order_temp.select(F.countDistinct('item_id')).collect()[0][0]
    orders_required = distinct_items * max_similar_items * min_items_confidence / pattern_proportion
    orders_single_day = order_days_count / n_days
    return 0 if orders_single_day == 0 else orders_required / orders_single_day


def trim_min_days(df_order, n_days):
    return df_order.filter(F.col('created_at') >= F.date_sub(F.current_date(), n_days))


class Encoder:
    def __init__(self):
        self.model = None

    def train_encoder(self, df, col='id'):
        stringIndexer = StringIndexer(inputCol=col, outputCol="id", stringOrderType="frequencyDesc")
        self.model = stringIndexer.fit(df)

    def encode_features(self, df):
        return self.model.transform(df)

    def decode_features(self, df, inputCol, outputCol):
        inverter = IndexToString(inputCol=inputCol, outputCol=outputCol, labels=self.model.labels)
        return inverter.transform(df)


def transform_data_fp(df, b_col, p_col, d_col):
    return df.groupby(b_col) \
        .agg(F.collect_list(p_col).alias(p_col),
             F.collect_list(d_col).alias(d_col),
             F.collect_list('id').alias('id')) \
        .withColumn("id", F.array_distinct("id"))


def min_date(date_time_obj, behaviour_days):
    return date_time_obj - timedelta(days=behaviour_days)


def merge(pred_ids_arr, scores_arr):
    item_score_dic = {}
    for pred_ids, scores in zip(pred_ids_arr, scores_arr):
        for p, s in zip(pred_ids, scores):
            item_score_dic[p] = item_score_dic.get(p, 0) + s
    return [k for k, v in sorted(item_score_dic.items(), key=lambda item: item[1], reverse=True)]


def all_subsets(ss):
    return chain(*map(lambda x: combinations(ss, x), range(0, len(ss) + 1)))


def make_combs(item_list):
    return [list(x) for x in all_subsets(item_list) if x]


def top_10_items(item_list):
    return item_list[:10]


def in_range(min_date, max_date, date):
    return min_date <= date <= max_date


def main():
    item_mapping = spark.read.csv("item_mappings.csv", header=True, inferSchema=True)
    dataset = spark.read.csv("dataset.csv", header=True, inferSchema=True)
    dataset.createOrReplaceTempView("dataset")
    print(dataset)
    cities = ['KARACHI']  # Add more cities as needed
    repartition = 10

    for city in cities:
        print(f"Processing City: {city}")

        dataset_query = f"""
            SELECT
                order_number,
                item_id,
                item_name,
                core_category,
                store_id,
                created_at
            FROM dataset
        """
        df_order_sql = spark.sql(dataset_query)
        df_order_sql.show()

        # Item mapping processing
        df_items = item_mapping

        # Hyperparameters
        n_days = 40
        behaviour_days = 40
        min_confidence = 0.1
        max_similar_items = 10
        min_items_confidence = 10
        pattern_proportion = 1 / 4
        sorting_metric = 'confidence'

        b_col = 'order_number'
        p_col = 'item_id'
        d_col = 'item_name'

        df_order_sql = df_order_sql.withColumn('created_at', F.to_timestamp('created_at'))
        n_days = days_finder(df_order_sql, n_days, max_similar_items, min_items_confidence, pattern_proportion)
        df_order = trim_min_days(df_order_sql, int(n_days))

        enc = Encoder()
        enc.train_encoder(df_items, 'item_id')
        df_encoded = enc.encode_features(df_order)
        df_fp = transform_data_fp(df_encoded, b_col, p_col, d_col)

        fpGrowth = FPGrowth(itemsCol="id", minSupport=0.001, minConfidence=0.1, numPartitions=64)
        model = fpGrowth.fit(df_fp.repartition(repartition))

        # Association rules
        model_associations_df = model.associationRules \
            .withColumn('consequent', F.explode('consequent')) \
            .groupby('antecedent') \
            .agg(F.collect_list('consequent').alias('consequent'),
                 F.collect_list(sorting_metric).alias(sorting_metric))

        id_itemId_dic = {row['id']: row['item_id'] for row in enc.encode_features(df_items).collect()}
        id_name_dic = {row['item_id']: row['item_name'] for row in df_items.collect()}

        id_itemId_udf = F.udf(lambda ids: [id_itemId_dic[i] for i in ids], ArrayType(StringType()))
        id_name_udf = F.udf(lambda ids: [id_name_dic[i] for i in ids], ArrayType(StringType()))
        model_associations_df = model_associations_df \
            .withColumn('input_ids', id_itemId_udf('antecedent')) \
            .withColumn('pred_ids', id_itemId_udf('consequent')) \
            .withColumn('input_names', id_name_udf('input_ids')) \
            .withColumn('pred_names', id_name_udf('pred_ids')) \
            .select('input_names', 'pred_names', sorting_metric, 'input_ids', 'pred_ids')

        model_associations_df.show()

        # Output recommendations
        recommendations_prd = model_associations_df.select('input_names', 'pred_names').dropDuplicates()
        recommendations_prd.show()
        recommendations_prd = recommendations_prd.withColumn("input_names", concat_ws(", ", "input_names"))
        recommendations_prd = recommendations_prd.withColumn("pred_names", concat_ws(", ", "pred_names"))
        recommendations_prd.write.csv("output", header=True, mode="overwrite")

        # Uncomment to save recommendations
        # recommendations_prd.repartition(repartition).write.csv(f"{dst_table}{city}", mode="overwrite", header=True)


if __name__ == '__main__':
    main()


DataFrame[order_number: bigint, item_id: string, item_name: string, core_category: string, store_id: string, created_at: date]
Processing City: KARACHI
+------------+--------------------+--------------------+-------------------+--------------------+----------+
|order_number|             item_id|           item_name|      core_category|            store_id|created_at|
+------------+--------------------+--------------------+-------------------+--------------------+----------+
| 24091140039|  495406729127576347|Head & Shoulders ...|      Personal Care|19TE0fJ8dDAIHobnu...|2024-10-12|
| 24091140039|  510597221483309850|Shan Bombay Birya...|Masala & Condiments|19TE0fJ8dDAIHobnu...|2024-10-12|
| 24091140039|  850744270230871589|Shan Nihari Sache...|Masala & Condiments|19TE0fJ8dDAIHobnu...|2024-10-12|
| 24091140039|  858265587021175186|Ariel Washing Pow...|              Safai|19TE0fJ8dDAIHobnu...|2024-10-12|
| 24091140039|  515035393486259426|Shan Korma Sachet...|Masala & Condiments|19TE0fJ8d