In [0]:
from pyspark.sql import functions as F

# قراءة جدول Silver الموحّد
movie_ratings_silver = spark.table("workspace.movielens_silver.movie_ratings_silver")

# إنشاء سكيمـا لطبقة Gold (مرة واحدة فقط)
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.movielens_gold")


DataFrame[]

In [0]:
from pyspark.sql import functions as F

# Join ratings_bronze and items_bronze to create movie_ratings_silver
ratings_bronze = spark.table(
    "workspace.movielens_bronze.ratings_bronze"
)
items_bronze = spark.table(
    "workspace.movielens_bronze.items_bronze"
)

movie_ratings_silver = ratings_bronze.join(
    items_bronze,
    ratings_bronze.item_id == items_bronze.movie_id,
    "inner"
).select(
    ratings_bronze.item_id.alias("movie_id"),
    items_bronze.movie_title,
    ratings_bronze.rating,
    F.from_unixtime(ratings_bronze.rating_timestamp).alias("rating_date")
)

# Now aggregate and write as before
movie_summary_gold = (
    movie_ratings_silver
        .groupBy("movie_id", "movie_title")
        .agg(
            F.count("*").alias("num_ratings"),
            F.avg("rating").alias("avg_rating"),
            F.stddev("rating").alias("std_rating"),
            F.min("rating_date").alias("first_rating_date"),
            F.max("rating_date").alias("last_rating_date")
        )
        .withColumn("avg_rating_rounded", F.round("avg_rating", 2))
)

movie_summary_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.movielens_gold.movie_summary_gold")

In [0]:
%sql
SELECT *
FROM workspace.movielens_gold.movie_summary_gold
ORDER BY num_ratings DESC
LIMIT 10;


movie_id,movie_title,num_ratings,avg_rating,std_rating,first_rating_date,last_rating_date,avg_rating_rounded
50,Star Wars (1977),583,4.358490566037736,0.8813410116763766,1997-09-20 04:29:10,1998-04-22 16:53:14,4.36
258,Contact (1997),509,3.803536345776032,0.9944270246912174,1997-09-20 20:12:17,1998-04-22 22:09:38,3.8
100,Fargo (1996),508,4.155511811023622,0.975755982575976,1997-09-20 16:56:15,1998-04-22 16:53:14,4.16
181,Return of the Jedi (1983),507,4.007889546351085,0.923954854284622,1997-09-20 04:31:41,1998-04-22 16:53:43,4.01
294,Liar Liar (1997),485,3.156701030927835,1.09854404175367,1997-09-20 20:12:46,1998-04-22 23:05:46,3.16
286,"English Patient, The (1996)",481,3.656964656964657,1.169400815588166,1997-09-20 03:05:27,1998-04-22 22:09:37,3.66
288,Scream (1996),478,3.44142259414226,1.113910362118221,1997-09-20 03:08:25,1998-04-22 23:04:21,3.44
1,Toy Story (1995),452,3.878318584070796,0.9278967014291224,1997-09-20 19:43:35,1998-04-22 16:56:14,3.88
300,Air Force One (1997),431,3.6310904872389793,0.9980718474755464,1997-09-20 16:51:26,1998-04-22 23:10:38,3.63
121,Independence Day (ID4) (1996),429,3.438228438228438,1.1165838396907493,1997-09-20 04:09:25,1998-04-22 17:06:56,3.44


In [0]:
popular_movies_gold = (
    movie_summary_gold
        .filter(F.col("num_ratings") >= 100)
        .orderBy(F.col("avg_rating").desc(), F.col("num_ratings").desc())
)

popular_movies_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.movielens_gold.popular_movies_gold")


In [0]:
%sql
SELECT movie_title,
       avg_rating_rounded AS avg_rating,
       num_ratings
FROM workspace.movielens_gold.popular_movies_gold
ORDER BY avg_rating DESC, num_ratings DESC
LIMIT 20;


movie_title,avg_rating,num_ratings
"Close Shave, A (1995)",4.49,112
Schindler's List (1993),4.47,298
"Wrong Trousers, The (1993)",4.47,118
Casablanca (1942),4.46,243
"Shawshank Redemption, The (1994)",4.45,283
"Usual Suspects, The (1995)",4.39,267
Rear Window (1954),4.39,209
Star Wars (1977),4.36,583
12 Angry Men (1957),4.34,125
"Silence of the Lambs, The (1991)",4.29,390


In [0]:
ratings_by_year_gold = (
    movie_ratings_silver
        .groupBy(F.year("rating_date").alias("rating_year"))
        .agg(
            F.count("*").alias("num_ratings"),
            F.avg("rating").alias("avg_rating")
        )
        .orderBy("rating_year")
        .withColumn("avg_rating_rounded", F.round("avg_rating", 2))
)

ratings_by_year_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.movielens_gold.ratings_by_year_gold")


In [0]:
%sql
SELECT rating_year,
       num_ratings,
       avg_rating_rounded AS avg_rating
FROM workspace.movielens_gold.ratings_by_year_gold
ORDER BY rating_year;


rating_year,num_ratings,avg_rating
1997,52899,3.57
1998,47101,3.49


In [0]:
from pyspark.sql import functions as F

spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.movielens_silver")

# Read bronze tables
ratings_bronze = spark.table("workspace.movielens_bronze.ratings_bronze")
items_bronze = spark.table("workspace.movielens_bronze.items_bronze")

# Create ratings_silver
ratings_silver = ratings_bronze.select(
    F.col("user_id"),
    F.col("item_id"),
    F.col("rating"),
    F.col("rating_timestamp")
)
ratings_silver.write.format("delta").mode("overwrite").saveAsTable("workspace.movielens_silver.ratings_silver")

# Create items_silver
genre_cols = [
    "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime",
    "Documentary", "Drama", "Fantasy", "Film_Noir", "Horror", "Musical",
    "Mystery", "Romance", "Sci_Fi", "Thriller", "War", "Western"
]
items_silver = items_bronze.select(
    F.col("movie_id"),
    F.col("movie_title"),
    *[F.col(g) for g in genre_cols]
)
items_silver.write.format("delta").mode("overwrite").saveAsTable("workspace.movielens_silver.items_silver")

# Continue with your original logic
ratings_silver = spark.table("workspace.movielens_silver.ratings_silver")
items_silver = spark.table("workspace.movielens_silver.items_silver")

df_joined = ratings_silver.alias("r").join(
    items_silver.alias("i"),
    F.col("r.item_id") == F.col("i.movie_id"),
    "inner"
).select(
    F.col("i.movie_id").alias("movie_id"),
    F.col("i.movie_title").alias("movie_title"),
    F.col("r.rating").alias("rating"),
    *[F.col(c) for c in genre_cols]
)

genre_expr = ", ".join([f"'{g}', {g}" for g in genre_cols])

df_genres = df_joined.selectExpr(
    "movie_id", "movie_title", "rating",
    f"stack({len(genre_cols)}, {genre_expr}) AS (genre, is_genre)"
).filter("is_genre = 1").drop("is_genre")

ratings_by_genre_gold = df_genres.groupBy("genre").agg(
    F.count("*").alias("num_ratings"),
    F.avg("rating").alias("avg_rating")
).withColumn(
    "avg_rating_rounded", F.round("avg_rating", 2)
).orderBy(F.col("num_ratings").desc())

spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.movielens_gold")

ratings_by_genre_gold.write.format("delta").mode("overwrite").saveAsTable(
    "workspace.movielens_gold.ratings_by_genre_gold"
)

In [0]:
%sql
SELECT *
FROM workspace.movielens_gold.ratings_by_genre_gold
ORDER BY num_ratings DESC;


genre,num_ratings,avg_rating,avg_rating_rounded
Drama,39895,3.687379370848477,3.69
Comedy,29832,3.3940734781442745,3.39
Action,25589,3.480245417953027,3.48
Thriller,21872,3.5090069495245064,3.51
Romance,19461,3.621704948358255,3.62
Adventure,13753,3.503526503308369,3.5
Sci_Fi,12730,3.5607227022780834,3.56
War,9398,3.815811874866993,3.82
Crime,8055,3.6322780881440098,3.63
Children,7182,3.3532442216652742,3.35
