In [1]:
import logging
from pyspark.sql import SparkSession, functions as F, DataFrame
from pyspark.ml.fpm import FPGrowth

from warp.spark.io_adapters.utils.enums import AdapterType
from warp.core.io_adapters import get_adapter
from warp.spark.io import cache


def create_logger(name: str) -> logging.Logger:
    """creates a logger with specified name"""
    logging.basicConfig(
        level=logging.INFO,
        format="[%(asctime)s] {%(name)s:%(filename)s:%(lineno)s}"
               " %(levelname)s - %(message)s",
    )

    return logging.getLogger(name)


def cache_df(df: DataFrame) -> DataFrame:
    return cache(
        df, file_schema="file:/", bucket_name="home/burhan/project_data"
    )


logger = logging.getLogger('py4j')
logger.setLevel(logging.ERROR)

builder = (
    SparkSession.builder
    .master('local[*]')
    .appName('term-project')
    .config('spark.default.parallelism', 8)
    .config('spark.sql.shuffle.partitions', 8)
    .config('spark.port.maxRetries', 32)
    .config('spark.sql.adaptive.enabled', False)
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '4g')
)

spark = builder.getOrCreate()
parquet_io = get_adapter(AdapterType.PARQUET)(spark_session=spark)

In [15]:
"""Store Translation ETL module."""
from pyspark.sql import functions as F

from warp.core.logger import log_time

from association import create_logger, delta_io
from association.etl import (
    get_arguments, STORE_PATH, STORE_TRANSLATIONS_PATH
)
from association.etl.utils import translate_from_uk_to_en

LOG = create_logger("store_translation_bootstrap_etl")


@log_time()
def etl(run_date: str) -> None:
    """etl code for store translation"""
    LOG.info(f"Running ETL for {run_date}.")

    store_df = parquet_io.read(path=STORE_PATH)

    store_translation_df = store_df.select(
        "store_id",
        F.col("store_description").alias("store_description_uk"),
        translate_from_uk_to_en("store_description").alias("store_description_en"),
        "segment"
    )

    store_segment_df = store_df.select("segment").distinct()
    store_segment_df = store_segment_df.withColumn(
        "segment_en", translate_from_uk_to_en("segment")
    )

    store_translation_df = store_translation_df.join(
        F.broadcast(store_segment_df), on="segment", how="left"
    )

    store_translation_df = store_translation_df.select(
        "store_id",
        "store_description_uk",
        "store_description_en",
        F.col("segment").alias("segment_uk"),
        "segment_en"
    )

    parquet_io.write(
        df=store_translation_df,
        path=STORE_TRANSLATIONS_PATH,
        mode="overwrite"
    )


In [16]:
STORE_PATH

In [17]:
STORE_TRANSLATIONS_PATH

In [29]:
etl("2024-01-01")

In [20]:
df = parquet_io.read("/home/burhan/datastore/association/etl/store_translations_local")

In [22]:
df.count()

In [21]:
df.show()

In [30]:
df = parquet_io.read("/home/burhan/datastore/association/etl/products_local")

In [32]:
df.limit(10).select("product_id").orderBy("product_id").withColumn("product_code", F.col("product_id").cast("string")).select("product_code").repartition(3).write.format("text").option("delimiter", "|").save("/home/burhan/datastore/association/etl/products_deneme_4", mode="overwrite")

In [27]:
df.select("product_description").repartition(1).write.format("text").option("delimiter", "|").save("/home/burhan/datastore/association/etl/products_deneme_2", mode="overwrite")

In [35]:
df.select("product_description").distinct().repartition(3).write.format("text").option("delimiter", "|").save("/home/burhan/datastore/association/etl/products_deneme_5", mode="overwrite")

In [28]:
df.select("product_description").limit(1000).repartition(1).write.format("text").option("delimiter", "|").save("/home/burhan/datastore/association/etl/products_deneme_3", mode="overwrite")

In [5]:
df = df.where(F.col("product_id") != F.lit(890999))
df = df.where(F.col("product_id") != F.lit(888376))
df = df.groupby("sales_id").agg(F.collect_set(F.col("product_id")).alias("basket"))

In [6]:
df = cache_df(df)

In [34]:
df.show()

In [7]:
fp = FPGrowth(minSupport=0.001, minConfidence=0.001, itemsCol='basket', predictionCol='prediction')
model = fp.fit(df)

In [None]:
model.freqItemsets.show(20, False)

In [None]:
model.freqItemsets.count()

In [37]:
model.associationRules.show(20, False)

In [None]:
model.associationRules.count()

In [8]:
cache_df(model.associationRules)

In [None]:
cache_df(model.freqItemsets)

In [18]:
model.associationRules.orderBy("lift").show(211, False)

In [28]:
spark.conf.get("spark.serializer")