In [0]:
%python
storage_key = dbutils.secrets.get(scope="movie-scope", key="storage-access-key")
spark.conf.set(
    "fs.azure.account.key.errisbinxhija01.dfs.core.windows.net",
    storage_key
)

In [0]:
from pyspark.sql import functions as F

# 1. Ngarkojmë të dhënat nga Bronze
movies_br = spark.read.table("movies_db.movies_bronze")
recs_br = spark.read.table("movies_db.recommendations_bronze")
ratings_br = spark.read.table("movies_db.ratings_history_bronze")
belief_br = spark.read.table("movies_db.belief_data_bronze")

# 2. PASTRIMI: Filtrojmë filmat pa zhanër
movies_silver = movies_br.filter(
    (F.col("genres").isNotNull()) & 
    (F.col("genres") != "(no genres listed)")
)

# 3. PASTRIMI I INTEGRITETIT: Heqim rreshtat jetimë në tabelat e tjera
recs_silver = recs_br.join(movies_silver, on="movieId", how="left_semi")
ratings_silver = ratings_br.join(movies_silver, on="movieId", how="left_semi")
belief_silver = belief_br.join(movies_silver, on="movieId", how="left_semi")

# 5. RAPORTI: Kalkulojmë sa rreshta u fshinë
report_data = [
    ("Movies", movies_br.count(), movies_silver.count()),
    ("Recommendations", recs_br.count(), recs_silver.count()),
    ("Ratings History", ratings_br.count(), ratings_silver.count()),
    ("Belief Data", belief_br.count(), belief_silver.count())
]

print(f"{'Table Name':<20} | {'Bronze Count':<15} | {'Silver Count':<15} | {'Deleted Rows':<15} | {'Loss %'}")
print("-" * 80)

for name, bronze, silver in report_data:
    deleted = bronze - silver
    loss_pct = (deleted / bronze) * 100 if bronze > 0 else 0
    print(f"{name:<20} | {bronze:<15} | {silver:<15} | {deleted:<15} | {loss_pct:.2f}%")

In [0]:
from pyspark.sql.functions import to_timestamp, current_timestamp, col

# 1. Marrim ratings_silver që sapo u krijua nga join-i me movies_silver
# (Ky është DataFrame-i që mbijetoi nga filtrimi i zhanreve)

ratings_silver_transformed = ratings_silver.withColumn(
    "rating_timestamp", 
    # Sigurohemi që tstamp të interpretohet si timestamp korrekt
    to_timestamp(col("tstamp"))
).withColumn(
    "processed_at", current_timestamp()
).drop("tstamp")

# 2. Ruajtja Finale në Delta
ratings_silver_transformed.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("movies_db.ratings_silver")

# 3. Shfaqim rezultatin për të kontrolluar kolonat e reja
print("✅ Ratings Silver u transformua me kolonat e kohës dhe u ruajt!")
display(ratings_silver_transformed.limit(5))

In [0]:
from pyspark.sql import functions as F

# 2. Transformimi: Viti, Titulli i pastër dhe kthejmë zhanret në Listë
movies_transformed = movies_silver.withColumn(
    "year", F.regexp_extract(F.col("title"), r"\((\d{4})\)", 1).cast("int")
).withColumn(
    "title", F.trim(F.regexp_replace(F.col("title"), r"\(\d{4}\)", ""))
).withColumn(
    "genres_list", F.split(F.col("genres"), r"\|")
)

# 3. One-Hot Encoding Dinamik për Zhanret
# Gjejmë të gjitha zhanret unike që ekzistojnë në dataset
unique_genres = [
    row[0] for row in movies_transformed.select(F.explode("genres_list")).distinct().collect()
]

# Krijojmë kolonat 0/1 për çdo zhanër
genre_cols = [
    F.array_contains(F.col("genres_list"), g).cast("int").alias(''.join(c if c.isalnum() else '_' for c in g))
    for g in unique_genres
]

# Bashkojmë kolonat e reja dhe fshijmë ato që nuk na duhen më
movies_final = movies_transformed.select("*", *genre_cols).drop("genres", "genres_list")

# 4. Ruajtja në Metastore si tabela finale Silver
movies_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("movies_db.movies_silver")

print(f"✅ Transformimi u krye! U krijuan {len(unique_genres)} kolona zhanresh.")
display(movies_final.limit(5))

In [0]:
from pyspark.sql.functions import current_timestamp

# 1. Përdorim variablën 'belief_silver' që doli nga join-i me movies_silver
# Kjo siguron që po ruajmë vetëm rekordet për filmat që kanë zhanre.
belief_silver_final = belief_silver.withColumn(
    "processed_at", current_timestamp()
)

# 2. Ruajmë si Delta Table në shtresën Silver
belief_silver_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("movies_db.belief_silver")

# 3. Shfaqim rezultatin
print(f"✅ Tabela 'belief_silver' u krijua me {belief_silver_final.count()} rreshta të pastruar.")
display(belief_silver_final.limit(5))

In [0]:
from pyspark.sql.functions import to_timestamp, current_timestamp, col

# 1. Përdorim 'recs_silver' që doli nga join-i i pastrimit në qelizën e mëparshme
# I shtojmë formatimin e kohës dhe auditimin
recs_silver_final = recs_silver.withColumn(
    "rec_timestamp", 
    to_timestamp(col("tstamp")) # Përdorim emrin korrekt të kolonës 'tstamp'
).withColumn(
    "processed_at", current_timestamp()
)

# 2. Ruajmë në formatin Delta (Shtresa Silver)
recs_silver_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("movies_db.recommendations_silver")

# 3. Shfaqim rezultatin
print(f"✅ Tabela 'recommendations_silver' u finalizua me {recs_silver_final.count()} rreshta.")
display(recs_silver_final.limit(5))