In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, FloatType

In [0]:
catalog_name = 'movies'

###IMDB movie lists

In [0]:
df_movies_bronze = spark.table(f'{catalog_name}.bronze.bronze_imdb_movies')
df_movies_bronze.show(10)

In [0]:
df_movies_silver = (
    df_movies_bronze
    .withColumn("title", F.trim(F.col("title")))
    .withColumn("genre", F.lower(F.col("genre")))
    .dropDuplicates(["id"])
    .filter(F.col("year").isNotNull())
)


In [0]:
df_movies_silver = df_movies_silver.withColumn("rating",
    F.when(F.col("rating").isNotNull(),F.abs(F.col("rating"))).otherwise(F.lit(0)))

In [0]:
display(df_movies_silver.limit(5))

In [0]:
df_movies_silver.write.format("delta").mode("overwrite").saveAsTable(f'{catalog_name}.silver.silver_imdb_movies_clean')

### IMDB review lists 

In [0]:
df_reviews_bronze = spark.table(f'{catalog_name}.bronze.bronze_imdb_reviews')
df_reviews_bronze.show(10)

In [0]:
df_reviews_silver = (df_reviews_bronze.dropDuplicates(["imdb_id"]))


In [0]:
df_reviews_silver = df_reviews_bronze.withColumn("review",F.trim(F.col("review")))

In [0]:
df_reviews_silver = (df_reviews_silver.withColumn("review_title", F.trim(F.col("review_title"))).filter(F.col("review").isNotNull()))

In [0]:
df_reviews_silver = df_reviews_silver.withColumn("review_rating",
    F.when(F.col("review_rating").isNotNull(),F.abs(F.col("review_rating"))).otherwise(F.lit(0)))

In [0]:
display(df_reviews_silver.limit(5))

In [0]:
df_reviews_silver.write.format("delta").mode("overwrite").saveAsTable(f'{catalog_name}.silver.silver_imdb_reviews_clean')

###IMDB reviews enriched

In [0]:
silver_reviews = spark.table(f'{catalog_name}.silver.silver_imdb_reviews_clean')

In [0]:
silver_reviews.printSchema()

In [0]:
display(silver_reviews.limit(2))

In [0]:
silver_reviews.count()

In [0]:
pdf = silver_reviews.toPandas()  

In [0]:
%pip install transformers

In [0]:
%pip install torch

In [0]:
%restart_python

In [0]:
from transformers import pipeline

sentiment_pipeline = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english"
)

In [0]:
sentiment_pipeline("This movie was amazing!")

In [0]:
def get_sentiment(text):
    if text is None or len(text.strip()) == 0:
        return {"label": "neutral", "score": 0.0}
    return sentiment_pipeline(text[:512])[0]

# Apply to Pandas
pdf["sentiment"] = pdf["review"].apply(get_sentiment)
pdf["sentiment_label"] = pdf["sentiment"].apply(lambda x: x["label"].lower())
pdf["sentiment_score"] = pdf["sentiment"].apply(lambda x: float(x["score"]))

pdf = pdf.drop(columns=["sentiment"])

In [0]:
display(pdf.head(5))

In [0]:
reviews_enriched = spark.createDataFrame(pdf)

In [0]:
reviews_enriched.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.silver.silver_imdb_reviews_enriched")