# **TikTok TechJam 2025 Hackathon (TRACK 1)**

**Team Name**: A/B Testing 

**Member Name(s)**: Asyraf Dzulfiqar, Beata Yeo

# Part 1: Collect data, join them and remove null values

In [2]:
from pyspark.sql import SparkSession
import numpy as np
import os
from pyspark.sql import functions as F
from pyspark.sql.functions import col, concat_ws, rand, lit
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

In [3]:
spark = SparkSession.builder \
        .appName("Hackathon") \
        .master("local[*]") \
        .config("spark.driver.memory", "16G") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000M") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.5.0") \
        .getOrCreate()

In [4]:
pathing_review = "datasets/review_data/"
arr = np.array(os.listdir(pathing_review))
reviewData_files = pathing_review + arr

pathing_metadata = "datasets/review_metadata/"
arr = np.array(os.listdir(pathing_metadata))
reviewMetadata_files = pathing_metadata + arr

df_review = spark.read.json(list(reviewData_files)).dropna(subset="text").drop_duplicates()
df_metadata = spark.read.json(list(reviewMetadata_files)).dropna(subset="category").drop_duplicates().withColumnRenamed("name", "business_name").select(["gmap_id", "category", "business_name"])

df_joined = df_review.join(df_metadata, on="gmap_id", how="inner").withColumn("category_str", concat_ws(", ", col("category"))).withColumn("random_order", rand()).orderBy("random_order").drop("random_order")

In [5]:
df_joined.count()

2528648

In [None]:
df_metadata.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- business_name: string (nullable = true)



In [None]:
df_review.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- pics: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- rating: long (nullable = true)
 |-- resp: struct (nullable = true)
 |    |-- text: string (nullable = true)
 |    |-- time: long (nullable = true)
 |-- text: string (nullable = true)
 |-- time: long (nullable = true)
 |-- user_id: string (nullable = true)



In [None]:
df_joined.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- pics: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- rating: long (nullable = true)
 |-- resp: struct (nullable = true)
 |    |-- text: string (nullable = true)
 |    |-- time: long (nullable = true)
 |-- text: string (nullable = true)
 |-- time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- business_name: string (nullable = true)
 |-- category_str: string (nullable = false)



In [5]:
from pyspark.sql.functions import col, when, count, size, trim
from pyspark.sql.types import StringType, FloatType, DoubleType, ArrayType, MapType

# 1) Drop unwanted columns
df_filtered = df_joined.drop("pics", "resp", "time", "category", "user_id")

# 2) Build a "is missing" condition per column based on its data type
missing_conds = []
for f in df_filtered.schema.fields:
    c = col(f.name)
    dt = f.dataType

    if isinstance(dt, (FloatType, DoubleType)):
        # floats/doubles: NULL or NaN
        cond = c.isNull() | c.isnan()
    elif isinstance(dt, StringType):
        # strings: NULL or empty after trim
        cond = c.isNull() | (trim(c) == "")
    elif isinstance(dt, (ArrayType, MapType)):
        # arrays/maps: NULL or empty
        cond = c.isNull() | (size(c) == 0)
    else:
        # ints/longs/booleans/date/timestamp/structs: only NULL
        cond = c.isNull()

    missing_conds.append(count(when(cond, True)).alias(f.name))


In [None]:
missing_counts = df_filtered.select(missing_conds)
missing_counts.show(truncate=False)



+-------+----+------+----+-------------+------------+
|gmap_id|name|rating|text|business_name|category_str|
+-------+----+------+----+-------------+------------+
|0      |0   |0     |0   |0            |0           |
+-------+----+------+----+-------------+------------+



                                                                                

# Remove unnecessary text elements

### Some reviews have translation, we only need English ones

In [None]:
df_joined.filter(df_joined.text.contains("(Translated by Google)")).select("text").show(5, truncate=100)



+-----------------------------------------------------------------------------------------------------------+
|                                                                                                       text|
+-----------------------------------------------------------------------------------------------------------+
|                                               (Translated by Google) Nice place\n\n(Original)\nLindo lugar|
|       (Translated by Google) Visit with respect and respect ..., very nice experience!\n\n(Original)\nM...|
|       (Translated by Google) It is a nice place the only detail is that it does not have bathrooms and ...|
|(Translated by Google) It's delicious and very good.\nPromise to go back\n\n(Original)\n맛있고 아주 좋아...|
|       (Translated by Google) Very friendly staff and all their fresh products\n\n(Original)\nEl persona...|
+-----------------------------------------------------------------------------------------------------------+
only showing top 

                                                                                

### Some reviews have many spacings, remove spacings for one-line reviews

In [None]:
df_joined.filter(F.col("text").rlike("\n")).select("text").show(5, truncate=100)



+----------------------------------------------------------------------------------------------------+
|                                                                                                text|
+----------------------------------------------------------------------------------------------------+
|          Been doing there a long time. Always get a great haircut -\nMichelle's the Best!! 👍 👍 👍|
|Very impressed! Dawn and Jazz are AMAZING!\nMake your appointments now for all of your holiday ga...|
|Awesome customer service. We visited this store for backsplash tiles and one of their design cons...|
|BEST BBQ IN PHOENIX!!!\n\nStopped Today and it didn’t disappoint!!! It was soo good!!! I got the ...|
+----------------------------------------------------------------------------------------------------+
only showing top 5 rows



                                                                                

In [6]:
# Removing (Translated by Google) prefix and (Original) languages to get the English reviews
# Removing all newlines for one-lined reviews
# Removing quotation marks

df_joined = df_joined.withColumn(
    "text",
    F.when(
        F.col("text").contains("(Translated by Google)"),
        # extract the English text, remove newlines, remove quotes
        F.regexp_replace(
            F.regexp_replace(
                F.regexp_extract(F.col("text"), r"\(Translated by Google\)\s*([^\n]+)", 1),
                r"\n+", " "
            ),
            r"\"", ""
        )
    ).otherwise(
        # for rows without Google Translate tag, remove newlines and quotes
        F.regexp_replace(
            F.regexp_replace(F.col("text"), r"\n+", " "),
            r"\"", ""
        )
    )
)


In [None]:
# spark_nlp = sparknlp.start(apple_silicon=True)



25/08/30 22:58:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
# -------------------------------
# 1. Document Assembler
# -------------------------------
customer_review = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("customer_review")

# -------------------------------
# 2. Tokenizer
# -------------------------------
customer_review_token = Tokenizer() \
    .setInputCols(["customer_review"]) \
    .setOutputCol("customer_review_token")

# -------------------------------
# 3. Spell Checker
# -------------------------------
customer_review_spell_checker = NorvigSweetingModel.pretrained() \
    .setInputCols(["customer_review_token"]) \
    .setOutputCol("customer_review_corrected")

# -------------------------------
# 4. Normalizer (lowercasing, clean text)
# -------------------------------
customer_review_normalizer = Normalizer() \
    .setInputCols(["customer_review_corrected"]) \
    .setOutputCol("customer_review_normalized") \
    .setLowercase(True)

# -------------------------------
# 5. StopWords Cleaner
# -------------------------------
customer_review_stopwordsCleaner = StopWordsCleaner() \
    .setInputCols(["customer_review_normalized"]) \
    .setOutputCol("customer_review_cleaned")

# -------------------------------
# 6. Lemmatizer
# -------------------------------
customer_review_lemma = LemmatizerModel.pretrained() \
    .setInputCols(["customer_review_token"]) \
    .setOutputCol("customer_review_lemma")

# -------------------------------
# 7. Word Embeddings (GloVe)
# -------------------------------
glove_embeddings = WordEmbeddingsModel.pretrained("glove_100d") \
    .setInputCols(["customer_review_token", "customer_review"]) \
    .setOutputCol("word_embeddings")

# -------------------------------
# 8. Sentence Embeddings (average pooling)
# -------------------------------
sentence_embeddings = SentenceEmbeddings() \
    .setInputCols(["customer_review", "word_embeddings"]) \
    .setOutputCol("customer_review_embeddings") \
    .setPoolingStrategy("AVERAGE")

# -------------------------------
# 9. Embeddings Finisher (convert to Spark vector/array)
# -------------------------------
customer_review_finisher = EmbeddingsFinisher() \
    .setInputCols(["customer_review_embeddings"]) \
    .setOutputCols(["customer_review_vector"]) \
    .setOutputAsVector(True) \
    .setCleanAnnotations(False)

spellcheck_norvig download started this may take some time.
Approximate size to download 4.2 MB
[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


In [8]:
# -------------------------------
# 1. Document Assembler
# -------------------------------
business_category = DocumentAssembler() \
    .setInputCol("category_str") \
    .setOutputCol("business_category")

# -------------------------------
# 2. Tokenizer
# -------------------------------
business_category_token = Tokenizer() \
    .setInputCols(["business_category"]) \
    .setOutputCol("business_category_token")

# -------------------------------
# 3. Spell Checker
# -------------------------------
business_category_spell_checker = NorvigSweetingModel.pretrained() \
    .setInputCols(["business_category_token"]) \
    .setOutputCol("business_category_corrected")

# -------------------------------
# 4. Normalizer (lowercasing, clean text)
# -------------------------------
business_category_normalizer = Normalizer() \
    .setInputCols(["business_category_corrected"]) \
    .setOutputCol("business_category_normalized") \
    .setLowercase(True)

# -------------------------------
# 5. StopWords Cleaner
# -------------------------------
business_category_stopwordsCleaner = StopWordsCleaner() \
    .setInputCols(["business_category_normalized"]) \
    .setOutputCol("business_category_cleaned")

# -------------------------------
# 6. Lemmatizer
# -------------------------------
business_category_lemma = LemmatizerModel.pretrained() \
    .setInputCols(["business_category_token"]) \
    .setOutputCol("business_category_lemma")

# -------------------------------
# 7. Word Embeddings (GloVe)
# -------------------------------
business_category_glove_embeddings = WordEmbeddingsModel.pretrained("glove_100d") \
    .setInputCols(["business_category_token", "business_category"]) \
    .setOutputCol("word_embeddings")

# -------------------------------
# 8. Sentence Embeddings (average pooling)
# -------------------------------
business_category_sentence_embeddings = SentenceEmbeddings() \
    .setInputCols(["business_category", "word_embeddings"]) \
    .setOutputCol("business_category_embeddings") \
    .setPoolingStrategy("AVERAGE")

# -------------------------------
# 9. Embeddings Finisher (convert to Spark vector/array)
# -------------------------------
business_category_finisher = EmbeddingsFinisher() \
    .setInputCols(["business_category_embeddings"]) \
    .setOutputCols(["business_category_vector"]) \
    .setOutputAsVector(True) \
    .setCleanAnnotations(False)

spellcheck_norvig download started this may take some time.
Approximate size to download 4.2 MB
[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


In [9]:
# Combine both into one pipeline
pipeline = Pipeline(stages=[
    # --- Review branch ---
    customer_review,
    customer_review_token,
    customer_review_spell_checker,
    customer_review_normalizer,
    customer_review_stopwordsCleaner,
    customer_review_lemma,
    glove_embeddings,
    sentence_embeddings,
    customer_review_finisher,

    # --- Business category branch ---
    business_category,
    business_category_token,
    business_category_spell_checker,
    business_category_normalizer,
    business_category_stopwordsCleaner,
    business_category_lemma,
    business_category_glove_embeddings,
    business_category_sentence_embeddings,
    business_category_finisher
])


In [10]:
result = pipeline.fit(df_joined).transform(df_joined)

In [None]:
result.select("text", "customer_review_vector").show(5, truncate=50, vertical=True)

# Manual labelling of reviews using statistical means

### Cosine similarity between review and business category for relevancy

In [11]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
import numpy as np

def cosine_similarity(v1, v2):
    if v1 is None or v2 is None:
        return None
    a = np.asarray(v1, dtype=float)
    b = np.asarray(v2, dtype=float)

    # squeeze 1xN / Nx1 or nested singletons to 1-D
    if a.ndim > 1:
        a = a.reshape(-1)
    if b.ndim > 1:
        b = b.reshape(-1)

    na = np.linalg.norm(a)
    nb = np.linalg.norm(b)
    if na == 0 or nb == 0:
        return 0.0
    return float(np.dot(a, b) / (na * nb))

cosine_sim_udf = udf(cosine_similarity, DoubleType())

result = result.withColumn(
    "cosine_similarity",
    cosine_sim_udf("customer_review_vector", "business_category_vector")
)


In [44]:
result.select("category_str", "text", "cosine_similarity").show(5, truncate=50)

+--------------------------------------------------+--------------------------------------------------+------------------+
|                                      category_str|                                              text| cosine_similarity|
+--------------------------------------------------+--------------------------------------------------+------------------+
|                            Window tinting service|I didn't personally go there but I did call and...|0.5424361742267093|
|       Bar & grill, American restaurant, Gastropub|Went for their catered/open bar new years event...|0.7932691830051947|
|Department store, Clothing store, Craft store, ...|Unlike the Williston Wal-Mart, Minot's Wal-Mart...|0.8120576958394772|
|Breakfast restaurant, American restaurant, Brun...|The food, service and ambience we're great. Par...|0.7619881697147413|
|                                  Sushi restaurant|There aren't many sushi places in the valley, b...|0.3624278644701128|
+---------------

In [12]:
from pyspark.storagelevel import StorageLevel

base = (result
    .select("business_name","category_str","text","rating","cosine_similarity")
    .filter(F.col("cosine_similarity").isNotNull())
    .persist(StorageLevel.MEMORY_AND_DISK))

In [13]:
def cache_once(df, level=StorageLevel.MEMORY_AND_DISK):
    # only cache if this instance isn’t already cached
    if not df.is_cached:
        df = df.persist(level)
    return df

In [None]:
test = result.select("cosine_similarity").filter(F.col("cosine_similarity").isNotNull())

# Global quantiles
q = test.approxQuantile("cosine_similarity", [0.25, 0.5, 0.75], 0.01)
LO, MED, HI = q[0], q[1], q[2]
print("LO, MED, HI =", LO, MED, HI)

In [14]:
benchmark = 0.70
low = 0.60

relevant_df  = cache_once(base.filter(F.col("cosine_similarity") >= benchmark))
irrelevant_df = cache_once(base.filter(F.col("cosine_similarity") <= low))


### Operations to find promotional links or advertisments in both datasets

In [15]:
# Helper funtions

def add_ad_key(df):
    cols = set(df.columns)

    def safe(name, cast_str=False):
        if name in cols:
            c = F.col(name)
            if cast_str:
                c = c.cast("string")
            return F.coalesce(c, F.lit(""))
        else:
            return F.lit("")

    # Build a stable key from available fields (order matters).
    # Include text + business_name + category_str + rating + cosine_similarity; add ids/time if present.
    return df.withColumn(
        "ad_key",
        F.sha2(F.concat_ws("||",
            safe("gmap_id"),
            safe("user_id"),
            safe("business_name"),
            safe("category_str"),
            safe("time", cast_str=True),
            safe("text"),
            safe("rating", cast_str=True),
            safe("cosine_similarity", cast_str=True)
        ), 256)
    )



def with_ads_flags(df):
    txt = F.coalesce(F.col("text"), F.lit(""))

    return (df
      # URLs / domains / shorteners / obfuscations
      .withColumn("has_url",          txt.rlike(r"(?i)\bhttps?://\S+|\bwww\.\S+"))
      .withColumn("has_domain",       txt.rlike(r"(?i)\b[a-z0-9][a-z0-9\-]*\.(?:com|net|org|co|io|info|biz|app|shop|store|sg|uk|au|ca|de|fr|my|ph|id|in)(?:/\S*)?\b"))
      .withColumn("has_shortener",    txt.rlike(r"(?i)\b(bit\.ly|t\.co|goo\.gl|tinyurl\.com|ow\.ly|wa\.me|linktr\.ee)/\S+"))
      .withColumn("has_obfus_domain", txt.rlike(r"(?i)\b[a-z0-9][a-z0-9\-]*\s*(?:\.|dot|\[\.]|\(dot\))\s*(?:com|net|org|co|io|sg|au|uk)\b"))

      # Contact info / WhatsApp
      .withColumn("has_email",        txt.rlike(r"(?i)[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}"))
      .withColumn("has_phone",        txt.rlike(r"(?i)(?:\+?\d[\s\-().]{0,3}){7,}\d"))
      .withColumn("has_whatsapp",     txt.rlike(r"(?i)\bwhatsapp\b|\bwa\.me/\S+"))

      # Promo / CTA phrases
      .withColumn("has_promo_words",  txt.rlike(r"(?i)\b(promo(?:\s*code)?|discount(?:\s*code)?|coupon|use\s+code|deal|sale|flash\s*sale|limited\s*time|special\s*offer|[0-9]{1,3}%\s*off|buy\s*now|order\s*now|book\s*now|free\s*shipping|visit\s+(?:our\s+)?website|click\s+(?:here|link))\b"))

      # Final flag + triggers for auditability
      .withColumn("policy_ads",
          F.col("has_url") | F.col("has_domain") | F.col("has_shortener") |
          F.col("has_obfus_domain") | F.col("has_email") | F.col("has_phone") |
          F.col("has_whatsapp") | F.col("has_promo_words")
      )
      .withColumn("ads_triggers", F.array_remove(F.array(
          F.when(F.col("has_url"),          F.lit("url")),
          F.when(F.col("has_domain"),       F.lit("domain")),
          F.when(F.col("has_shortener"),    F.lit("shortener")),
          F.when(F.col("has_obfus_domain"), F.lit("obfus_domain")),
          F.when(F.col("has_email"),        F.lit("email")),
          F.when(F.col("has_phone"),        F.lit("phone")),
          F.when(F.col("has_whatsapp"),     F.lit("whatsapp")),
          F.when(F.col("has_promo_words"),  F.lit("promo_words"))
      ), None))
    )

In [16]:
relevant_flagged = add_ad_key(with_ads_flags(relevant_df))
irrelevant_flagged = add_ad_key(with_ads_flags(irrelevant_df))

In [17]:
ads_union = (relevant_flagged.withColumn("source_split", F.lit("relevant"))
             .unionByName(irrelevant_flagged.withColumn("source_split", F.lit("irrelevant"))))

ads_only = (ads_union
            .filter(F.col("policy_ads"))
            .dropDuplicates(["ad_key"]))

In [18]:
ads_keys = ads_only.select("ad_key")

relevant_clean = (relevant_flagged.join(ads_keys, on="ad_key", how="left_anti")
                  .drop("has_url","has_domain","has_shortener","has_obfus_domain",
                        "has_email","has_phone","has_whatsapp","has_promo_words",
                        "policy_ads","ads_triggers"))

irrelevant_clean = (irrelevant_flagged.join(ads_keys, on="ad_key", how="left_anti")
                    .drop("has_url","has_domain","has_shortener","has_obfus_domain",
                          "has_email","has_phone","has_whatsapp","has_promo_words",
                          "policy_ads","ads_triggers"))


### Find reviews that are rants without visit in irrelevant dataset

In [19]:
# Choose the base irrelevant dataframe safely
try:
    irr_base = irrelevant_clean
except NameError:
    try:
        irr_base = irrelevant_df
    except NameError:
        irr_base = irrelevant_df

irr_base = irr_base.cache()


In [20]:
def ensure_sentiment(df):
    if "sentiment_num" in df.columns:
        return df

    # Prefer corrected tokens if available
    use_corr = "customer_review_corrected" in df.columns
    text_col = "text"

    if use_corr:
        from sparknlp.base import Finisher
        df = (Finisher()
              .setInputCols(["customer_review_corrected"])
              .setOutputCols(["corr_tokens"])
              .setOutputAsArray(True)
              .setCleanAnnotations(True)
             ).transform(df)
        df = df.withColumn("text_corrected", F.array_join("corr_tokens", " "))
        text_col = "text_corrected"



    document = DocumentAssembler().setInputCol(text_col).setOutputCol("doc")
    token    = Tokenizer().setInputCols(["doc"]).setOutputCol("tok")
    viv      = ViveknSentimentModel.pretrained().setInputCols(["doc","tok"]).setOutputCol("sent")
    pipe     = Pipeline(stages=[document, token, viv]).fit(df)

    out = pipe.transform(df)
    out = (out
        .withColumn("sentiment_str", F.expr("sent[0].result"))
        .withColumn("prob_pos", F.expr("cast(sent[0].metadata['positive'] as double)"))
        .withColumn("prob_neg", F.expr("cast(sent[0].metadata['negative'] as double)"))
        .drop("sent")
    )
    # margin to avoid shaky labels
    out = out.withColumn(
        "sentiment_num",
        F.when(F.col("prob_pos") - F.col("prob_neg") >= 0.2, 1.0)
         .when(F.col("prob_neg") - F.col("prob_pos") >= 0.2, -1.0)
         .otherwise(0.0)
    )
    return out

irr_sent = ensure_sentiment(irr_base).cache()


sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
[OK!]


In [21]:
# Regexes
nonvisit_rx = r"""(?i)\b(
    never\s+been|haven'?t\s+been|didn'?t\s+visit|did\s+not\s+visit|
    phone\s+call|called\s+(them|store)|left\s+voicemail|email(ed)?|
    website|online\s+(order|booking|application|support|chat)|
    delivery\s+app|uber\s*eats|doordash|grab\s*food|foodpanda
)\b"""

rumor_rx = r"(?i)\b(i\s*(just\s*)?heard|people\s+say|someone\s+told\s+me|my\s+friend\s+said)\b"

irr_rules = (irr_sent
    .withColumn("text_nn", F.coalesce(F.col("text"), F.lit("")))
    .withColumn("char_len", F.length("text_nn"))
    .withColumn("excl_count", F.size(F.split(F.regexp_replace("text_nn", r"[^!]", ""), "")))
    .withColumn("nonvisit_clues", F.col("text_nn").rlike(nonvisit_rx))
    .withColumn("rumor_clues",    F.col("text_nn").rlike(rumor_rx))
    # core policy flag: negative + (explicit non-visit OR strong proxy)
    .withColumn("policy_nonvisitor_rant",
        (F.col("sentiment_num") < 0) &
        ( F.col("nonvisit_clues") |
          F.col("rumor_clues") |
          (F.col("char_len") < 40) |           # very short angry blurt
          (F.col("excl_count") >= 3)           # lots of exclamation marks
        )
    )
)


In [22]:
def add_key(df):
    cols = set(df.columns)
    def safe(name, cast=False):
        if name in cols:
            c = F.col(name)
            if cast: c = c.cast("string")
            return F.coalesce(c, F.lit(""))
        return F.lit("")

    return df.withColumn("rant_key", F.sha2(F.concat_ws("||",
        safe("gmap_id"),
        safe("user_id"),
        safe("business_name"),
        safe("category_str"),
        safe("time", cast=True),
        safe("text"),
        safe("rating", cast=True)
    ), 256))

irr_flagged = add_key(irr_rules)

rant_only = (irr_flagged
    .filter(F.col("policy_nonvisitor_rant"))
    .dropDuplicates(["rant_key"])     # just in case
    .persist(StorageLevel.MEMORY_AND_DISK)
)


In [23]:
rant_keys = rant_only.select("rant_key")

irrelevant_no_rant = (irr_flagged.join(rant_keys, on="rant_key", how="left_anti")
    .drop("text_nn","char_len","excl_count","nonvisit_clues","rumor_clues",
          "policy_nonvisitor_rant","rant_key")  # keep your frame clean
    .persist(StorageLevel.MEMORY_AND_DISK)
)


# Model training pipeline setup and evaluation

## Preparing datasets for BERT model

In [24]:
relevant_dataset = relevant_clean.withColumn("label", lit("RELEVANT")).select("text", "label").limit(1500)
relevant_dataset.printSchema()  # these reviews are to be labelled as "relevant"

root
 |-- text: string (nullable = true)
 |-- label: string (nullable = false)



In [25]:
irrelevant_dataset = irrelevant_no_rant.withColumn("label", lit("IRRELEVANT")).select("text", "label").limit(1500)
irrelevant_dataset.printSchema()        # these reviews are to be labelled as "irrelevant"

root
 |-- text: string (nullable = true)
 |-- label: string (nullable = false)



In [26]:
ads_dataset = ads_only.withColumn("label", lit("ADVERTISMENT")).select("text", "label").limit(1500)
ads_dataset.printSchema()      # these reviews are to be labelled as "advertisment"

root
 |-- text: string (nullable = true)
 |-- label: string (nullable = false)



In [27]:
rant_dataset = rant_only.withColumn("label", lit("RANT WITHOUT VISIT")).select("text", "label").limit(1500)
rant_dataset.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: string (nullable = false)



In [40]:
relevant_dataset.count(), irrelevant_dataset.count(), ads_dataset.count(), rant_dataset.count()

(1500, 1500, 1500, 0)

## Setting up model pipeline

In [28]:
df_all = relevant_dataset.unionByName(irrelevant_dataset).unionByName(ads_dataset).unionByName(rant_dataset)

train_df, test_df = df_all.randomSplit([0.8, 0.2], seed=42)
train_df.cache()
test_df.cache()

DataFrame[text: string, label: string]

In [None]:
from pyspark.ml import Pipeline
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import BertSentenceEmbeddings, ClassifierDLApproach

# 4a) Turn raw text into 'document'
document_assembler = (DocumentAssembler()
    .setInputCol("text")
    .setOutputCol("document")
)

# 4b) Sentence-level BERT embeddings
sentence_bert = (BertSentenceEmbeddings
    .pretrained("sent_small_bert_L2_128", "en")   
    .setInputCols(["document"])
    .setOutputCol("sentence_embeddings")
)

# 4c) Classification head
classifier = (ClassifierDLApproach()
    .setInputCols(["sentence_embeddings"])
    .setOutputCol("class")          
    .setLabelColumn("label")       
    .setBatchSize(32)
    .setMaxEpochs(10)
    .setLr(1e-3)
    .setValidationSplit(0.1)
    .setEnableOutputLogs(True)      
)

pipeline = Pipeline(stages=[document_assembler, sentence_bert, classifier])

sent_small_bert_L2_128 download started this may take some time.
Approximate size to download 16.1 MB
[OK!]


In [30]:
model = pipeline.fit(train_df)

In [31]:
from pyspark.sql import functions as F

pred = model.transform(test_df)

# Extract the top prediction string from the annotation column 'class'
pred = pred.withColumn("pred_label", F.col("class.result").getItem(0))
pred.select("text", "label", "pred_label").show(5, truncate=80)

+--------------------------------------------------------------------------------+--------+------------+
|                                                                            text|   label|  pred_label|
+--------------------------------------------------------------------------------+--------+------------+
|**Update** See below Nice experience. Several pools, nice selection of temper...|RELEVANT|ADVERTISMENT|
|4 years leaving here and I have to say that is very nice and secure place... ...|RELEVANT|ADVERTISMENT|
|                           8.99 per pound for the best slice roast beef in town!|RELEVANT|ADVERTISMENT|
|A bit out dated and very strong odor of air fresheners which was as little ov...|RELEVANT|  IRRELEVANT|
|A hidden gem in Billings, feels like a small town diner, the people are super...|RELEVANT|    RELEVANT|
+--------------------------------------------------------------------------------+--------+------------+
only showing top 5 rows



In [33]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

# Fit on the union of both columns to guarantee identical mapping
label_union = (pred
    .select(F.col("label").alias("lab"))
    .union(pred.select(F.col("pred_label").alias("lab")))
    .distinct()
)

indexer = StringIndexer(inputCol="lab", outputCol="lab_idx", handleInvalid="keep")
idx_model_base = indexer.fit(label_union)

# Reuse the same fitted model for each column via copy with new params
idx_true = idx_model_base.copy({idx_model_base.inputCol: "label",
                                idx_model_base.outputCol: "label_idx"})
idx_pred = idx_model_base.copy({idx_model_base.inputCol: "pred_label",
                                idx_model_base.outputCol: "prediction"})

# Apply both models
eval_df = idx_true.transform(pred)
eval_df = idx_pred.transform(eval_df).select("label_idx", "prediction")

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc = MulticlassClassificationEvaluator(
    labelCol="label_idx", predictionCol="prediction", metricName="accuracy"
).evaluate(eval_df)

f1 = MulticlassClassificationEvaluator(
    labelCol="label_idx", predictionCol="prediction", metricName="f1"
).evaluate(eval_df)

pr = MulticlassClassificationEvaluator(
    labelCol="label_idx", predictionCol="prediction", metricName="weightedPrecision"
).evaluate(eval_df)

rc = MulticlassClassificationEvaluator(
    labelCol="label_idx", predictionCol="prediction", metricName="weightedRecall"
).evaluate(eval_df)

print(f"Accuracy:  {acc:.4f}")
print(f"F1:        {f1:.4f}")
print(f"Precision: {pr:.4f}")
print(f"Recall:    {rc:.4f}")



Accuracy:  0.7064
F1:        0.7050
Precision: 0.7231
Recall:    0.7064


In [35]:
# Save
model_path = "bert_cls_model"
model.write().overwrite().save(model_path)

# Load later
# from pyspark.ml import PipelineModel
# loaded_model = PipelineModel.load(model_path)

In [37]:
# Fit ONE indexer on union so mapping matches for both cols
lab_union = (pred.select(F.col("label").alias("lab"))
                  .union(pred.select(F.col("pred_label").alias("lab")))
                  .distinct())
base_idx = StringIndexer(inputCol="lab", outputCol="lab_idx", handleInvalid="keep").fit(lab_union)
idx_true = base_idx.copy({base_idx.inputCol: "label",      base_idx.outputCol: "y"})
idx_pred = base_idx.copy({base_idx.inputCol: "pred_label", base_idx.outputCol: "yhat"})

e = idx_pred.transform(idx_true.transform(pred)).select("y","yhat","label","pred_label")

# Confusion matrix (pivoted)
cm = (e.groupBy("y").pivot("yhat").agg(F.count("*")).na.fill(0).orderBy("y"))
cm.show(50, truncate=False)

# Per-class metrics
tot_by_y    = e.groupBy("y").agg(F.count("*").alias("n_y"))
tot_by_yhat = e.groupBy("yhat").agg(F.count("*").alias("n_yhat"))
tp = e.filter(F.col("y")==F.col("yhat")).groupBy("y").agg(F.count("*").alias("tp"))

perclass = (tp.join(tot_by_y, "y").join(tot_by_yhat, tp["y"]==tot_by_yhat["yhat"], "left")
              .drop("yhat")
              .withColumn("precision", F.col("tp")/F.col("n_yhat"))
              .withColumn("recall",    F.col("tp")/F.col("n_y"))
              .withColumn("f1",        2*F.col("precision")*F.col("recall")/(F.col("precision")+F.col("recall"))))
perclass.show(truncate=False)


+---+---+---+---+
|y  |0.0|1.0|2.0|
+---+---+---+---+
|0.0|273|4  |32 |
|1.0|19 |165|114|
|2.0|45 |40 |173|
+---+---+---+---+

+---+---+---+------+------------------+------------------+------------------+
|y  |tp |n_y|n_yhat|precision         |recall            |f1                |
+---+---+---+------+------------------+------------------+------------------+
|2.0|173|258|319   |0.542319749216301 |0.6705426356589147|0.5996533795493935|
|1.0|165|298|209   |0.7894736842105263|0.5536912751677853|0.650887573964497 |
|0.0|273|309|337   |0.8100890207715133|0.883495145631068 |0.8452012383900929|
+---+---+---+------+------------------+------------------+------------------+



In [38]:
# which string label corresponds to index 0,1,2 ?
print("index -> label:", list(enumerate(base_idx.labels)))


index -> label: [(0, 'ADVERTISMENT'), (1, 'IRRELEVANT'), (2, 'RELEVANT')]


### Analysis on datasets to understand why the models would label reviews as such

In [41]:
from pyspark.sql import functions as F

def _get(name): return globals().get(name, None)

dfs = {
    "RELEVANT": _get("relevant_clean"),
    "IRRELEVANT": _get("irrelevant_no_rant"),
    "ADVERTISEMENT": _get("ads_only"),
    "RANT WITHOUT VISIT": _get("rant_only"),
}
missing = [k for k,v in dfs.items() if v is None]
if missing:
    raise RuntimeError(f"Missing DataFrames: {missing}. Create those first.")

union_cols = []
for label, df in dfs.items():
    # keep only text and add label
    union_cols.append(df.select(F.col("text").alias("text"), F.lit(label).alias("label")))

labeled = union_cols[0]
for part in union_cols[1:]:
    labeled = labeled.unionByName(part)

# basic hygiene
labeled = (labeled
    .withColumn("text", F.col("text").cast("string"))
    .filter(F.col("text").isNotNull() & (F.length("text") > 0))
)

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml import Pipeline

tok = RegexTokenizer(inputCol="text", outputCol="tokens",
                     pattern=r"[^A-Za-z0-9]+", toLowercase=True, minTokenLength=2)
rm  = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")

pipe = Pipeline(stages=[tok, rm]).fit(labeled)
cleaned = pipe.transform(labeled).select("label","tokens_clean").filter(F.size("tokens_clean") > 0)


In [42]:
# Document frequency: use per-doc unique tokens
docs_unique = cleaned.select(F.array_distinct("tokens_clean").alias("utoks"))
N_docs = docs_unique.count()

idf = (docs_unique
       .select(F.explode("utoks").alias("term"))
       .groupBy("term").agg(F.count("*").alias("df"))
       .withColumn("idf", F.log( (F.lit(N_docs)+1.0) / (F.col("df")+1.0) ) + F.lit(1.0))
)

# Per-label term frequency
tf_label = (cleaned
    .select("label", F.explode("tokens_clean").alias("term"))
    .groupBy("label","term").agg(F.count("*").alias("tf"))
)

# TF-IDF per label
tfidf_label = (tf_label.join(idf, on="term", how="inner")
               .withColumn("score", F.col("tf") * F.col("idf")))


In [43]:
from pyspark.sql.window import Window

w = Window.partitionBy("label").orderBy(F.desc("score"))
top_uni = (tfidf_label
           .withColumn("rank", F.row_number().over(w))
           .filter(F.col("rank") <= 30)
           .select("label","term","tf","idf","score","rank"))

top_uni.orderBy("label","rank").show(120, truncate=False)


+-------------+----------+------+------------------+------------------+----+
|label        |term      |tf    |idf               |score             |rank|
+-------------+----------+------+------------------+------------------+----+
|ADVERTISEMENT|deal      |6893  |6.233113881852588 |42964.85398760989 |1   |
|ADVERTISEMENT|sale      |4678  |6.649039096785642 |31104.204894763232|2   |
|ADVERTISEMENT|discount  |2956  |7.1034613182966675|20997.831656884948|3   |
|ADVERTISEMENT|get       |3768  |3.839536536518086 |14467.373669600147|4   |
|ADVERTISEMENT|great     |5968  |2.339466177097054 |13961.93414491522 |5   |
|ADVERTISEMENT|good      |5090  |2.5770313395532987|13117.089518326291|6   |
|ADVERTISEMENT|store     |2968  |4.010623399241094 |11903.530248947567|7   |
|ADVERTISEMENT|coupon    |1403  |7.917115589588923 |11107.71317219326 |8   |
|ADVERTISEMENT|one       |2711  |4.093896967715667 |11098.554679477173|9   |
|ADVERTISEMENT|time      |2671  |3.8951598854889107|10403.97205414088 |10  |

It is thus concluded that through statistical and pattern searching methods to label data, the model is not able to distinguish between relevant and irrelevant data. There are highly ranked words appearing in irrelevant and relevant datasets. Thus, with the overlap, the model is not able to fully ascertain if the reviews are relevant or not relevant. 

-

-

-

# Demo

This demo shall use three reviews for display:

In [None]:
!unzip -o bert_cls_model.zip -d .

In [None]:
relevant_test = relevant_clean.withColumn("random_sort_key", rand()).orderBy("random_sort_key").drop("random_sort_key").limit(3).withColumn("label", lit("RELEVANT")).select("text", "label")
irrelevant_test = irrelevant_clean.withColumn("random_sort_key", rand()).orderBy("random_sort_key").drop("random_sort_key").limit(3).withColumn("label", lit("IRRELEVANT")).select("text", "label")
ads_test = ads_only.withColumn("random_sort_key", rand()).orderBy("random_sort_key").drop("random_sort_key").limit(3).withColumn("label", lit("ADVERTISEMENT")).select("text", "label")

df_test = relevant_test.unionByName(irrelevant_test).unionByName(ads_test)
df_in = df_test.select(F.col("text").cast("string"))

In [None]:
from pyspark.ml import PipelineModel

MODEL_DIR = "./bert_cls_model"   
pipe = PipelineModel.load(MODEL_DIR)

# Classifier output column (often "class")
OUT_COL = pipe.stages[-1].getOutputCol()
print("Classifier output column:", OUT_COL)


In [None]:
from pyspark.sql import functions as F

# Keep both text and label when passing into pipeline
df_in = df_test.select(F.col("text").cast("string"), "label")

scored = pipe.transform(df_in)

pred = (
    scored
    .select(
        "text",    # original review text
        "label",   # actual label
        F.expr(f"{OUT_COL}[0].result").alias("pred_label")  # predicted label
    )
)

pred.show(truncate=50)
