In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=290a6bd6c565b54c6c9f72738e5c0a07f6feda3901893d9c082e5747111499e2
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
import os

# Create a Spark session
spark = SparkSession.builder.appName("MovieRatingsIngestion").getOrCreate()

# Define the schema for the movie ratings data
schema = StructType([
    StructField("UserID", StringType(), True),
    StructField("MovieID", StringType(), True),
    StructField("Rating", FloatType(), True),
    StructField("Timestamp", TimestampType(), True)
])

# Define paths
raw_data_path = "/content/sample_data/movie_ratings.csv"
delta_table_path = "/content/sample_data/delta/movie_ratings"

# Check if the raw data file exists
if os.path.exists(raw_data_path):
    try:
        # Read the CSV file into a DataFrame
        ratings_df = spark.read.csv(raw_data_path, schema=schema, header=True).withColumn("file_name", input_file_name())

        # Write the DataFrame to a Delta table
        ratings_df.write.format("delta").mode("overwrite").save(delta_table_path)
        print("Data loaded and saved as Delta table.")
    except Exception as e:
        print(f"Error: {e}")
else:
    print(f"File not found: {raw_data_path}")


Task 2: Data Cleaning

In [None]:
# Read the raw Delta table
raw_ratings_df = spark.read.format("delta").load(delta_table_path)

# Clean the DataFrame
cleaned_ratings_df = raw_ratings_df.filter((raw_ratings_df.Rating >= 1) & (raw_ratings_df.Rating <= 5))

# Remove duplicates based on UserID and MovieID
cleaned_ratings_df = cleaned_ratings_df.dropDuplicates(["UserID", "MovieID"])

# Save the cleaned data to a new Delta table
cleaned_delta_path = "/content/sample_data/delta/cleaned_movie_ratings"
cleaned_ratings_df.write.format("delta").mode("overwrite").save(cleaned_delta_path)
print("Cleaned data saved to Delta table.")


Task 3: Movie Rating Analysis

In [None]:
# Read the cleaned data
cleaned_ratings_df = spark.read.format("delta").load(cleaned_delta_path)

# Calculate average rating for each movie
average_ratings = cleaned_ratings_df.groupBy("MovieID").agg({"Rating": "avg"}).withColumnRenamed("avg(Rating)", "AverageRating")
average_ratings.show()

# Identify movies with the highest and lowest average ratings
highest_rating = average_ratings.orderBy("AverageRating", ascending=False).limit(1)
lowest_rating = average_ratings.orderBy("AverageRating", ascending=True).limit(1)

highest_rating.show()
lowest_rating.show()

# Save analysis results to a Delta table
average_ratings.write.format("delta").mode("overwrite").save("/content/sample_data/delta/average_movie_ratings")


Task 4: Time Travel and Delta Lake History

In [None]:
# Perform an update to the movie ratings data
updated_ratings_df = cleaned_ratings_df.withColumn("Rating",
    when(cleaned_ratings_df.UserID == "U001", 5).otherwise(cleaned_ratings_df.Rating))
updated_ratings_df.write.format("delta").mode("overwrite").save(delta_table_path)

# Roll back to the previous version of the Delta table
spark.sql("RESTORE TABLE delta.`/content/sample_data/delta/movie_ratings` TO VERSION AS OF 0")

# Use DESCRIBE HISTORY to view the history of changes
history_df = spark.sql("DESCRIBE HISTORY delta.`/content/sample_data/delta/movie_ratings`")
history_df.show()


Task 5: Optimize Delta Table

In [None]:
# Implement Z-ordering on the MovieID column to improve query performance
spark.sql("OPTIMIZE delta.`/content/sample_data/delta/movie_ratings` ZORDER BY (MovieID)")

# Compact the data and improve performance
spark.sql("OPTIMIZE delta.`/content/sample_data/delta/movie_ratings`")

# Use VACUUM to clean up older versions of the table
spark.sql("VACUUM delta.`/content/sample_data/delta/movie_ratings` RETAIN 168 HOURS")
