In [67]:
%pip install delta_spark nltk

Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting regex>=2021.8.3 (from nltk)
  Obtaining dependency information for regex>=2021.8.3 from https://files.pythonhosted.org/packages/2c/8d/3a99825e156744b85b031c1ea966051b85422d13972ed7cd2cd440e0c6c4/regex-2023.8.8-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading regex-2023.8.8-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.9/40.9 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
Downloading regex-2023.8.8-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (782 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m782.4/782.4 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: regex, nltk
S

In [3]:
from pathlib import Path

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, BooleanType, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.pip_utils import configure_spark_with_delta_pip

In [5]:
JAR_PACKAGES = ",".join([str(x) for x in Path("../jars").glob("*.jar")])

In [6]:
builder = SparkSession.builder.appName("delta").master("spark://spark:7077") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.jars", JAR_PACKAGES) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key","datalake") \
    .config("spark.hadoop.fs.s3a.secret.key","datalake") \
    .config("spark.hadoop.fs.s3a.endpoint","http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

In [7]:
spark = configure_spark_with_delta_pip(builder).enableHiveSupport().getOrCreate()

In [8]:
ecommerce_schema = StructType([
    StructField("unique_id", StringType(), False),
    StructField("crawl_timestamp", StringType()),
    StructField("product_url", StringType()),
    StructField("product_name", StringType()),
    StructField("product_category_tree", StringType()),
    StructField("product_id", StringType()),
    StructField("retail_price", FloatType()),
    StructField("discounted_price", FloatType()),
    StructField("image", StringType()),
    StructField("is_fk_advantage_product", StringType()),
    StructField("description", StringType()),
    StructField("product_rating", StringType()),
    StructField("overall_rating", StringType()),
    StructField("brand", StringType()),
    StructField("product_specifications", StringType())
])
dict_specs_schema = ArrayType(StructType([
    StructField("key", StringType()),
    StructField("value", StringType()),
]))

In [9]:
bronze_container_path = "s3a://bronze"

df = spark.read.format("csv") \
    .option("escape", '"') \
    .option("multiLine", True) \
    .option("header", True) \
    .schema(ecommerce_schema) \
    .load(f"{bronze_container_path}/flipkart_ecommerce.csv")

### Product Category Tree

In [10]:
def convert_category_tree_to_list(category_df):
    category_tree_column = F.from_json(F.col("product_category_tree"), ArrayType(elementType=StringType())).getItem(0)
    return category_df.withColumn("product_category_tree", F.split(category_tree_column, ">>"))


modified_category_df = convert_category_tree_to_list(category_df=df)

main_category_window_spec = Window.orderBy("main_category")
category_window_spec = Window.orderBy("category")
sub_category_window_spec = Window.orderBy("sub_category")

modified_category_df = modified_category_df.select(
    F.col("*"),
    F.col("product_category_tree").getItem(0).alias("main_category"),
    F.col("product_category_tree").getItem(1).alias("category"),
    F.col("product_category_tree").getItem(2).alias("sub_category")
).select(
    F.col("*"),
    F.dense_rank().over(main_category_window_spec).alias("main_category_id"),
    F.dense_rank().over(category_window_spec).alias("category_id"),
    F.dense_rank().over(sub_category_window_spec).alias("sub_category_id")
)

modified_category_df.count()

20000

#### dim_main_category

In [11]:
dim_main_category_df = modified_category_df.select(F.col("main_category_id"),
    F.col("main_category").alias("name")
).distinct().select(
    F.monotonically_increasing_id().cast(IntegerType()).alias("id"),
    F.col("*")
)
dim_main_category_df.printSchema()
dim_main_category_df.count()


root
 |-- id: integer (nullable = false)
 |-- main_category_id: integer (nullable = false)
 |-- name: string (nullable = true)



266

#### dim_category

In [12]:
dim_category_df = modified_category_df.select(
    F.col("category_id"),
    F.col("category").alias("name")
).distinct().select(
    F.monotonically_increasing_id().cast(IntegerType()).alias("id"),
    F.col("*")
)
dim_category_df.printSchema()
dim_category_df.count()

root
 |-- id: integer (nullable = false)
 |-- category_id: integer (nullable = false)
 |-- name: string (nullable = true)



223

#### dim_sub_category

In [13]:
dim_sub_category_df = modified_category_df.select(
    F.col("sub_category_id"),
    F.col("sub_category").alias("name")
).distinct().select(
    F.monotonically_increasing_id().cast(IntegerType()).alias("id"),
    F.col("*")
)
dim_sub_category_df.printSchema()
dim_sub_category_df.count()

root
 |-- id: integer (nullable = false)
 |-- sub_category_id: integer (nullable = false)
 |-- name: string (nullable = true)



899

### Product Specifications

In [14]:
def get_product_specs_item(index, key):
    product_specs_json_column = F.regexp_replace(F.col("product_specifications"), pattern="=>", replacement=":")
    product_specs_arr_column = F.from_json(
        col=F.get_json_object(product_specs_json_column, "$.product_specification"),
        schema=dict_specs_schema
    )
    return product_specs_arr_column.getItem(index).getItem(key)


def get_product_specs_field(name):
    return F.coalesce(
        F.when(get_product_specs_item(0, "key") == name, get_product_specs_item(0, "value")),
        F.when(get_product_specs_item(1, "key") == name, get_product_specs_item(1, "value")),
        F.when(get_product_specs_item(2, "key") == name, get_product_specs_item(2, "value")),
        F.when(get_product_specs_item(3, "key") == name, get_product_specs_item(3, "value")),
        F.when(get_product_specs_item(4, "key") == name, get_product_specs_item(4, "value"))
    )

specs_window_spec = Window.orderBy("product_specifications")

modified_specs_df = modified_category_df.select(
    F.col("*"),
    F.dense_rank().over(specs_window_spec).alias("specification_id"),
    get_product_specs_field(name="Type").alias("type"),
    get_product_specs_field(name="Ideal For").alias("ideal_for"),
    get_product_specs_field(name="Occasion").alias("occasion"),
    get_product_specs_field(name="Color").alias("color"),
    get_product_specs_field(name="Number of Contents in Sales Package").alias("quantity")
)

modified_specs_df.count()

20000

#### dim_specification

In [15]:
dim_specification_df = modified_specs_df.select(
    F.col("specification_id"),
    F.col("type"),
    F.col("ideal_for"),
    F.col("occasion"),
    F.col("color"),
    F.col("quantity"),
).distinct().select(
    F.monotonically_increasing_id().cast(IntegerType()).alias("id"),
    F.col("*")
)
dim_specification_df.printSchema()
dim_specification_df.count()

root
 |-- id: integer (nullable = false)
 |-- specification_id: integer (nullable = false)
 |-- type: string (nullable = true)
 |-- ideal_for: string (nullable = true)
 |-- occasion: string (nullable = true)
 |-- color: string (nullable = true)
 |-- quantity: string (nullable = true)



18826

### Brand

In [16]:

brand_window_spec = Window.orderBy("brand")
modified_brand_df = modified_specs_df.withColumn("brand_id", F.dense_rank().over(brand_window_spec))

#### dim_brand

In [17]:
dim_brand_df = modified_brand_df.select(
    F.col("brand_id"),
    F.col("brand")
).distinct().select(
    F.monotonically_increasing_id().cast(IntegerType()).alias("id"),
    F.col("*")
)

dim_brand_df.printSchema()
dim_brand_df.count()

root
 |-- id: integer (nullable = false)
 |-- brand_id: integer (nullable = false)
 |-- brand: string (nullable = true)



3501

### Date

In [18]:
modified_date_df = modified_brand_df.select(
    F.col("*"),
    F.dense_rank().over(Window.orderBy("crawl_timestamp")).alias("crawl_date_id"),
    F.year("crawl_timestamp").alias("year"),
    F.month("crawl_timestamp").alias("month"),
    F.dayofmonth("crawl_timestamp").alias("day"),
    F.hour("crawl_timestamp").alias("hour"),
    F.minute("crawl_timestamp").alias("minute"),
    F.date_format("crawl_timestamp", "z").alias("timezone"),
    F.date_format("crawl_timestamp", "E").alias("day_name"),
    F.date_format("crawl_timestamp", "MMM").alias("month_name"),
    F.weekofyear("crawl_timestamp").alias("year_week"),
    F.quarter("crawl_timestamp").alias("quarter")
)

#### dim_date

In [19]:
dim_date_df = modified_date_df.select(
    F.col("crawl_date_id").alias("id"),
    F.col("year"),
    F.col("month"),
    F.col("day"),
    F.col("hour"),
    F.col("minute"),
    F.col("timezone"),
    F.col("day_name"),
    F.col("month_name"),
    F.col("year_week"),
    F.col("quarter"),
).distinct()

dim_date_df.printSchema()
dim_date_df.count()

root
 |-- id: integer (nullable = false)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- timezone: string (nullable = true)
 |-- day_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- year_week: integer (nullable = true)
 |-- quarter: integer (nullable = true)



371

### dim_product

In [90]:
dim_product_df = modified_date_df.select(
    F.col("product_id"),
    F.col("product_url"),
    F.col("product_name"),
    F.col("description"),
    F.from_json(F.col("image"), ArrayType(StringType())).alias("image"),
).distinct().select(
    F.monotonically_increasing_id().cast(IntegerType()).alias("id"),
    F.col("*")
)

dim_product_df.printSchema()
dim_product_df.count()

root
 |-- id: integer (nullable = false)
 |-- product_id: string (nullable = true)
 |-- product_url: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)



20000

In [104]:
import nltk
from nltk.corpus import stopwords
from pyspark.ml.feature import StopWordsRemover

nltk.download("stopwords")
stopwords = list(set(stopwords.words("english") + ["of", "by", "key", "features"] )) 
print(stopwords)

remover = StopWordsRemover(stopWords=stopwords, inputCol="x", outputCol="description_cleansed")

x = dim_product_df.select(
        F.col("*"),
        F.split(F.lower(F.regexp_replace(
            F.regexp_replace(F.col("description"), r"[^a-zA-Z0-9]", " "),
            r"\s+",
            " "
        )), " ").alias("x")
)
remover.transform(x).show()

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


['and', "won't", 'its', 'then', 'very', 'yourself', "didn't", 'are', 'mightn', "you'd", 'key', 'now', 'y', 'until', 'any', 'where', 'we', 'didn', 'isn', "mustn't", 'is', 'theirs', 'when', 'hers', 'whom', 've', 'off', 'out', 'hadn', "you've", 'with', 'ourselves', "weren't", 'same', 'was', 'themselves', 'such', "that'll", 'has', 'before', 'all', 'no', 'ours', 'features', 'or', 'do', 'hasn', 'shan', 'because', 'what', "should've", 'at', 'needn', 'does', 'him', 'did', 'am', 'yourselves', 'the', "needn't", 'mustn', 'doing', "don't", 'few', 'each', "wasn't", 'not', 'why', 'i', "couldn't", 'ma', 'don', 'aren', 'under', 's', 'your', 'them', 'more', 'who', 'so', "wouldn't", 'there', 'between', 'while', 'of', 'himself', 'only', 'other', 'myself', 'once', 'nor', 'both', 'during', 'weren', 'own', 'they', "aren't", "she's", 'below', "hasn't", 'be', 'herself', 're', 'm', 'as', 'my', 't', 'had', 'her', 'about', 'an', 'o', 'shouldn', 'just', "haven't", 'd', 'against', 'being', 'but', "isn't", 'above',

### Fact Product Posting

In [38]:
fact_product_posting_df = modified_date_df.select(
    F.monotonically_increasing_id().cast(IntegerType()).alias("id"),
    F.col("unique_id"),
    F.col("brand_id"),
    F.col("main_category_id"),
    F.col("category_id"),
    F.col("sub_category_id"),
    F.col("product_id"),
    F.col("specification_id"),
    F.col("crawl_date_id"),
    F.col("retail_price"),
    F.col("discounted_price"),
    F.col("is_fk_advantage_product").cast(BooleanType()),
    F.col("product_rating"),
    F.col("overall_rating")
)
fact_product_posting_df.printSchema()
fact_product_posting_df.count()

root
 |-- id: integer (nullable = false)
 |-- unique_id: string (nullable = true)
 |-- brand_id: integer (nullable = false)
 |-- main_category_id: integer (nullable = false)
 |-- category_id: integer (nullable = false)
 |-- sub_category_id: integer (nullable = false)
 |-- product_id: string (nullable = true)
 |-- specification_id: integer (nullable = false)
 |-- crawl_date_id: integer (nullable = false)
 |-- retail_price: float (nullable = true)
 |-- discounted_price: float (nullable = true)
 |-- is_fk_advantage_product: boolean (nullable = true)
 |-- product_rating: string (nullable = true)
 |-- overall_rating: string (nullable = true)



20000