**Defining Paths**

In [0]:
silver_table_path = '/Volumes/workspace/default/my_volume/Sentiment_Project/silver_tweets_cleaned/'
gold_brand_sentiment_path = '/Volumes/workspace/default/my_volume/Sentiment_Project/gold_brand_sentiment'
gold_summary_path = '/Volumes/workspace/default/my_volume/Sentiment_Project/gold_brand_sentiment_summary'

In [0]:
silver_df = spark.read.format('delta').load(silver_table_path)
print(f"Silver table records: {silver_df.count()}")

**Defining _Brands Dictionary_**

In [0]:
from pyspark.sql.functions import col, explode, size, udf
from pyspark.sql.types import StringType, ArrayType
import re

brand_keywords = {
    "apple": ["iphone", "ipad", "macbook", "mac", "apple", "macos", "ios", "applestore", "tim cook", "apple watch"],
    "google": ["google", "android", "pixel", "chrome", "youtube", "gmail", "googledrive", "google cloud"],
    "microsoft": ["microsoft", "windows", "surface", "xbox", "azure", "office365", "teams", "outlook"],
    "samsung": ["samsung", "galaxy", "note", "fold", "zflip", "android", "oneui", "samsung tv"],
    "amazon": ["amazon", "alexa", "kindle", "primevideo", "aws", "echo dot", "amazon prime"]
}

**Defining a UDF for brands detection from the data**

In [0]:
def detect_brands(text):
  if text is None:
    return []
  text_lower = text.lower()
  detected_brands = []

  for brand, keywords in brand_keywords.items():
    for keyword in keywords: # Using raw string prefix(r) and word boundry anchors to match whole words only
      if re.search(r'\b' + re.escape(keyword) + r'\b', text_lower):
        detected_brands.append(brand)
        break

  return list(set(detected_brands)) # Converting to SET to remove duplicate records

In [0]:
# Registering above method as a UDF

detect_brands_udf = udf(detect_brands, ArrayType(StringType()))

In [0]:
df_with_brands = silver_df.withColumn('Detected_Brands', detect_brands_udf(col('Cleaned_Text')))

- **Filtering data without any brand mentions**
- **Exploding the detected brands array(list returned by UDF) to have one row per brand mention**

In [0]:
df_brand_mentions = df_with_brands.filter(size(col('Detected_Brands'))>0)

exploded_df = df_brand_mentions.withColumn('Brand', explode(col('Detected_Brands')))

**Create Gold Tables**

In [0]:
# Fetching relevant columns for a detailed gold table
gold_detail_df = exploded_df.select('Brand', 'Sentiment', 'Cleaned_Text', 'Detected_Brands')

# Writing gold detail table
gold_detail_df.write.format('delta').mode('overwrite').save(gold_brand_sentiment_path)
print('Detailed gold table written!')

# Creating aggregated summary for dashboard
from pyspark.sql.functions import count
summary_df = exploded_df.groupBy('Brand', 'Sentiment').agg(count('*').alias('Count'))

# Writing Summary table
summary_df.write.format('delta').mode('overwrite').save(gold_summary_path)
print('Summary Gold table written!')

**Verifying the results**

In [0]:
# Reading and displaying summary

summary_df = spark.read.format('delta').load(gold_summary_path)
display(summary_df.orderBy('Brand', 'Sentiment'))

print(f"Total brand mentions: {summary_df.count()}")
print(f"Unique brands mentioned: {summary_df.select('Brand').distinct().count()}")