# Directories

In [0]:
BRONZE_VOL = "/Volumes/webscraper/bronze/"
GSMARENA_LOC = BRONZE_VOL + "cellphone_raw/GSMArena/"
KIJIJI_CELL_LOC = BRONZE_VOL + "cellphone_raw/Kijiji/"
KIJIJI_MOTO_LOC = BRONZE_VOL + "motorcycle_raw/Kijiji/"

In [0]:
display(dbutils.fs.ls(GSMARENA_LOC))
display(dbutils.fs.ls(KIJIJI_CELL_LOC))
display(dbutils.fs.ls(KIJIJI_MOTO_LOC))

# Raw Files

In [0]:
gsm_df = spark.read.option("multiline", "true").json(GSMARENA_LOC + "*.json")
brand_df = spark.read.option("header", True).csv(GSMARENA_LOC + "cell_brand.csv")
model_df = spark.read.option("header", True).csv(GSMARENA_LOC + "cell_models.csv")
display(gsm_df.limit(5))
display(brand_df.limit(5))
display(model_df.limit(5))

In [0]:
cell_df = spark.read.option("multiline", "true").json(KIJIJI_CELL_LOC + "*.json")
display(cell_df.limit(5))

In [0]:
moto_df = spark.read.option("multiline", "true").json(KIJIJI_MOTO_LOC + "*.json")
display(moto_df.limit(5))

# Cellphone Dataset

In [0]:
from pyspark.sql import functions as F

def normalize_text(col):
    return F.trim(
        F.regexp_replace(
            F.regexp_replace(
                F.regexp_replace(
                    F.split(F.lower(col), F.lit("tag"), 2).getItem(0),
                    r"\+", " plus"),
                r"[^a-z0-9]", " "),
            r"\s+", " "
        )
    )

### Preprocess

In [0]:
cell_cpy = cell_df
cell_cpy = cell_cpy.fillna({'Category': 'Product'})
cell_cpy = cell_cpy.withColumn(
    "text_norm",
    normalize_text(
        F.concat_ws(" ", "title", "description")
    )
)
cell_cpy = cell_cpy.dropDuplicates(["ID"])

display(cell_cpy.describe())

### Storage

In [0]:
# 1. create patterns with varying level of priority
storage_groups = spark.createDataFrame([
    # most common phone storage (128-512)
    {"storage_unit": "GB","pattern": r"\b(?!0{3})(\d{3})\s*(gb|gigs|gib)\b", "priority": 1},
    # another common storage (32,64)
    {"storage_unit": "GB","pattern": r"\b(?!0{2})(32|64)\s*(gb|gigs|gib)\b", "priority": 2},
    # common, but also used for expandable storage
    {"storage_unit": "TB","pattern": r"\b([1-8])\s*(tb)\b", "priority": 3},
    {"storage_unit": "GB","pattern": r"\b(?!0{4})(\d{4})\s*(gb|gigs|gib)\b", "priority": 4},
    # French abbreviation
    {"storage_unit": "GB","pattern": r"\b(?!0{3,4})(\d{3,4})\s*(go)\b", "priority": 5},
    # larger numbers and weak abbreviation
    {"storage_unit": "GB","pattern": r"\b(?!0{3,4})(128|256|512|1000|1024|2000|2048)\s*(g)\b", "priority": 6},
    # usually storage but can be RAM
    {"storage_unit": "GB","pattern": r"\b(?!0{2})(\d{2})\s*(gb|gigs|gib)\b", "priority": 7},
    {"storage_unit": "GB","pattern": r"\b(?!0{2})(\d{2})\s*(go)\b", "priority": 8},
    # generally RAM
    {"storage_unit": "GB","pattern": r"\b([1-9])\s*(gb|gigs|gib)\b", "priority": 9},
    {"storage_unit": "GB","pattern": r"\b([1-9])\s*(go)\b", "priority": 10},
    # weakest match
    {"storage_unit": "GB","pattern": r"\b(?!0{2,4})(\d{2,4})\s*(g)\b", "priority": 11},
    {"storage_unit": "GB","pattern": r"\b([126789])\s*(g)\b", "priority": 12},
])

# 2. Create a table with each pattern and entry
cell_storage_df = cell_cpy\
    .select("ID", "text_norm")\
    .crossJoin(storage_groups)
# check if the pattern matches
cell_storage_df = cell_storage_df\
    .withColumn(
        "storage_amt",
        F.regexp_extract("text_norm", F.col("pattern"), 1)
    )
# filter out non-matches
cell_storage_df = cell_storage_df.filter(F.col("storage_amt") != "")

# 3. Find highest priority for each ID
storage_priority = cell_storage_df\
    .groupBy("ID")\
    .agg(F.min("priority").alias("min"))
# join priority and filter
cell_storage_df = cell_storage_df.join(storage_priority, "ID", "inner")
cell_storage_df = cell_storage_df.filter(F.col("min") == F.col("priority"))

# 4. normalize all the units
cell_storage_df = cell_storage_df.withColumn(
    "storage_gb",
    F.when(F.col("storage_unit") == F.lit("TB"),
           F.col("storage_amt").cast("int") * F.lit(1024)
    ).otherwise(F.col("storage_amt").cast("int"))
)

# 5. filter out intermediate columns
cell_storage_df = cell_storage_df.drop("text_norm", "priority", "min", "pattern", "storage_unit", "storage_amt")

display(cell_storage_df.limit(5))

### Extract brands

In [0]:
# 1. Extract brand from postings
cell_brand_df = cell_cpy.select("ID", "text_norm").crossJoin(brand_df)
cell_brand_df = cell_brand_df.withColumn(
    "brand_match",
    F.regexp_instr("text_norm", "pattern")
)
# filter out non-matches
cell_brand_df = cell_brand_df.where(F.col("brand_match") != 0)

# 2. Create table to identify first match
first_inst_df = cell_brand_df.groupBy("ID").agg(
    F.min("brand_match").alias("min"),
    F.count("brand").alias("count")
)
# filter out ads with many brands (usually repair or professional)
first_inst_df = first_inst_df.filter(F.col("count") <= 2)

# 3. Keep only the first match
cell_brand_df = cell_brand_df.join(first_inst_df, on="ID", how="right_outer")
cell_brand_df = cell_brand_df.filter(F.col("min") == F.col("brand_match"))

# remove intermediate columns
cell_brand_df = cell_brand_df.drop("min", "count", "brand_match", "pattern")

display(cell_brand_df.limit(5))

### Extract model

In [0]:
# 1. match models to the identified brands
cell_model_df = cell_brand_df.join(
        model_df.withColumnRenamed("url", "gsm_url"),
        "brand",
        "inner"
    )
# match pattern
cell_model_df = cell_model_df.withColumn(
    "model_match",
    F.regexp_instr("text_norm", "pattern")
)
# filter out non-matches
cell_model_df = cell_model_df.filter(F.col("model_match") != 0)

# 2. filter out ads with too many matches (likely repairs)
repair_df = cell_model_df\
    .groupBy("ID")\
    .count()\
    .filter(F.col("count") < 10)

cell_model_df = cell_model_df.join(repair_df, "ID", "inner")

# 3. get the longest match
longest_df = cell_model_df\
    .groupBy("ID")\
    .agg(F.max(F.col("length")).alias("max"))

cell_model_df = cell_model_df.join(longest_df, "ID", "inner")
cell_model_df = cell_model_df.filter(F.col("max") == F.col("length"))

# 4. get first match
first_df = cell_model_df\
    .groupBy("ID")\
    .agg(F.min(F.col("model_match")).alias("min"))

cell_model_df = cell_model_df.join(first_df, "ID", "inner")
cell_model_df = cell_model_df.filter(F.col("min") == F.col("model_match"))

cell_model_df = cell_model_df.drop(
    "brand", "text_norm", "pattern", "length",
    "model_match", "count", "max", "min"
)

display(cell_model_df.limit(5))

In [0]:
complete_df = cell_cpy\
    .join(cell_storage_df, "ID", "left")\
    .join(cell_brand_df.drop("text_norm"), "ID", "left")\
    .join(cell_model_df, "ID", "left")

display(complete_df.limit(5))