In [0]:
# Loding All External Loactions:

checkpoint = spark.sql("describe external location `dev-checkpoints`").select("url").collect()[0][0]
landing = spark.sql("describe external location `dev-landing`").select("url").collect()[0][0]
bronze = spark.sql("describe external location `dev-bronze`").select("url").collect()[0][0]
silver = spark.sql("describe external location `dev-silver`").select("url").collect()[0][0]
gold = spark.sql("describe external location `dev-gold`").select("url").collect()[0][0]

In [0]:
# Get the environment variable

dbutils.widgets.text(name="env", defaultValue='', label='Enter the environment in lower case')
env = dbutils.widgets.get("env")



In [0]:
# Define Unity Catalog and schemas dynamically
catalog_name = f"{env}_catalog"
bronze_schema = "bronze"
silver_schema = "silver"

In [0]:


from pyspark.sql.functions import col

def normalize_column_names(df):
    
    for old_col in df.columns:
        new_col = old_col.replace(" ", "_")
        df = df.withColumnRenamed(old_col, new_col)
    print(" Column names normalized!")
    return df


In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format, col
from pyspark.sql.types import TimestampType

def format_created_utc(df):
    """Ensures `created_utc` is TIMESTAMP and formats it as 'yyyy-MM-dd'."""
    df = df.withColumn("created_utc", col("created_utc").cast(TimestampType()))  
    df = df.withColumn("created_utc", date_format(from_utc_timestamp(col("created_utc"), "UTC"), "yyyy-MM-dd"))
    print(" Formatted `created_utc` to 'yyyy-MM-dd'!")
    return df

print("Function to format `created_utc` column loaded!")


In [0]:


from pyspark.sql.functions import trim

def standardize_author(df):
    """Trims leading and trailing spaces from `author` column."""
    df = df.withColumn("author", trim(col("author")))
    print(" Standardized `author` column!")
    return df


In [0]:


def filter_invalid_records(df):
    
    df = df.filter(
        (col("id").isNotNull()) & (col("id") != "") &
        (col("subreddit").isNotNull()) & (col("subreddit") != "") &
        (col("title").isNotNull()) & (col("title") != "")
    )
    print("Removed invalid records!")
    return df


In [0]:


def validate_numerical_fields(df):
    
    df = df.filter(
        (col("score") >= 0) &
        (col("num_comments") >= 0) &
        (col("upvote_ratio").between(0.0, 1.0))
    )
    print("Validated numerical fields!")
    return df


In [0]:


from pyspark.sql.functions import current_date

def add_load_date(df):
    """Adds `load_date` column to track data ingestion."""
    df = df.withColumn("load_date", current_date())
    print(" Added `load_date` column!")
    return df


In [0]:

def remove_duplicates(df):
    
    df = df.dropDuplicates(["id"])
    print(" Removed duplicate records!")
    return df



# Data Quality Checks

In [0]:
from pyspark.sql.functions import sum as F_sum

def data_quality_checks(df):
   
    print("Running Data Quality Checks...")

    # Count NULLs in important columns 
    null_counts = df.select([
        F_sum(col(c).isNull().cast("int")).alias(c) for c in ["id", "subreddit", "title", "created_utc"]
    ])
    
    print(" NULL Counts:")
    null_counts.show()

    # Check for duplicate `id`s
    duplicate_count = df.groupBy("id").count().filter(col("count") > 1).count()
    print(f" Duplicate IDs: {duplicate_count}")

    # Check valid ranges for numerical columns
    invalid_scores = df.filter(col("score") < 0).count()
    invalid_comments = df.filter(col("num_comments") < 0).count()
    invalid_upvote = df.filter(~col("upvote_ratio").between(0.0, 1.0)).count()

    print(f" Invalid Scores: {invalid_scores}")
    print(f" Invalid Num_Comments: {invalid_comments}")
    print(f"Invalid Upvote Ratios: {invalid_upvote}")

    print("Data Quality Checks Completed!")



# Applying Tranformations And Quality Checks

In [0]:

# Load Bronze Table

df_bronze = spark.read.table(f"`{catalog_name}`.`{bronze_schema}`.raw_redditPosts")
# df_bronze.display()

# # Apply transformations one by one
df_silver = normalize_column_names(df_bronze)

# df_silver.select("created_utc").show(5, truncate=False)

df_silver = format_created_utc(df_silver)
df_silver = standardize_author(df_silver)
df_silver = filter_invalid_records(df_silver)
df_silver = validate_numerical_fields(df_silver)
df_silver = add_load_date(df_silver)
df_silver = remove_duplicates(df_silver)


# Run Data Quality Checks
data_quality_checks(df_silver)




In [0]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql.types import FloatType, StructType, StructField


# Initialize VADER Analyzer
analyzer = SentimentIntensityAnalyzer()

# 2Define Sentiment Analysis Function
def analyze_sentiment_vader(text):
    """Returns positive, neutral, negative, and compound sentiment scores using VADER."""
    if text is None or text.strip() == "":
        return (0.0, 0.0, 0.0, 0.0)  
    
    scores = analyzer.polarity_scores(text)
    return (scores["pos"], scores["neu"], scores["neg"], scores["compound"])

# Define UDF for PySpark
sentiment_udf = udf(analyze_sentiment_vader, StructType([
    StructField("positive", FloatType(), True),
    StructField("neutral", FloatType(), True),
    StructField("negative", FloatType(), True),
    StructField("compound", FloatType(), True)
]))





In [0]:
# Apply VADER Sentiment Analysis
df_silver = df_silver.withColumn("sentiment", sentiment_udf(col("title")))
df_silver = df_silver.withColumn("positive_score", col("sentiment").positive) \
                     .withColumn("neutral_score", col("sentiment").neutral) \
                     .withColumn("negative_score", col("sentiment").negative) \
                     .withColumn("compound_score", col("sentiment").compound) \
                     .drop("sentiment")

print("Applied VADER Sentiment Analysis!")

In [0]:
#  Write to Silver Table
df_silver.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable(
        f"{catalog_name}.{silver_schema}.cleaned_redditPosts"
    )


print("Silver processing completed successfully!")

In [0]:
# Validations
df = spark.sql(f"SELECT COUNT(*) FROM {env}_catalog.silver.cleaned_redditPosts")
df.show()

In [0]:
df = spark.sql(f"SELECT * FROM {env}_catalog.silver.cleaned_redditPosts")
df.display()