In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder.appName("TestApp").getOrCreate()
    print("Spark Session Created Successfully!")
    spark.stop()
except Exception as e:
    print("Error starting Spark:", e)


Spark Session Created Successfully!


In [2]:
from pyspark.sql import SparkSession

# Initialise Spark Session
spark = SparkSession.builder \
        .appName("MovieLensAnalysis") \
        .getOrCreate()

# Load the ratings and movies CSV files into Dataframes
ratings_df = spark.read.csv(r'C:\Users\doyeo\Desktop\SPARK\Movie\data\rating.csv',header= True, inferSchema= True)
movies_df = spark.read.csv(r'C:\Users\doyeo\Desktop\SPARK\Movie\data\movie.csv',header= True, inferSchema= True)

ratings_df.printSchema()
movies_df.printSchema()

ratings_df.show(5)
movies_df.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...| 

In [3]:
from pyspark.sql.functions import avg, count

# Group ratings by movieId to compute average rating and count of the ratings
movie_ratings = ratings_df.groupBy("movieId") \
                .agg(avg("rating").alias("Avg_rating"),count("rating").alias("Num_ratings"))

#Join the aggregated ratings with movie details
movie_details = movies_df.join(movie_ratings, on = "movieId", how="inner")

# View top 10 movies sorted by average rating
movie_details.orderBy(movie_details.Avg_rating.desc()).show(10)

+-------+--------------------+--------------------+----------+-----------+
|movieId|               title|              genres|Avg_rating|Num_ratings|
+-------+--------------------+--------------------+----------+-----------+
| 120134|Doggiewoggiez! Po...|              Comedy|       5.0|          1|
| 128093|Bo Burnham: Words...|              Comedy|       5.0|          1|
| 112790|Going Down in LA-...|Comedy|Drama|Romance|       5.0|          1|
| 121029|No Distance Left ...|         Documentary|       5.0|          1|
| 129036|People of the Win...|         Documentary|       5.0|          1|
| 114214|Mishen (Target) (...|        Drama|Sci-Fi|       5.0|          1|
| 129034| Serving Life (2011)|         Documentary|       5.0|          1|
|  40404| Al otro lado (2004)|               Drama|       5.0|          1|
|  89133|Boys (Drenge) (1977)|               Drama|       5.0|          1|
|  79866|Schmatta: Rags to...|         Documentary|       5.0|          1|
+-------+----------------

In [4]:
from pyspark.sql.functions import split, explode, avg, count

# Assume movies_df has colums : movieId,title, genres,
# Split the genres string by '|' and explode into separate row

movies_exploded = movies_df.withColumn("genre",explode(split("genres","\\|")))
movies_exploded.select("movieId","title","genre").show(5)

# Aggregate ratings by movie
# Aggregate ratings by movie
movie_ratings = ratings_df.groupBy("movieId").agg(
    avg("rating").alias("avg_rating"),
    count("rating").alias("num_ratings")
)

# Join movie details (with exploded genres) with the aggregated ratings
movies_with_ratings = movies_exploded.join(movie_ratings, on="movieId", how="inner")

# Group by genre to compute overall average rating and the count of movies in each genre
genre_analysis = movies_with_ratings.groupBy("genre").agg(
    avg("avg_rating").alias("genre_avg_rating"),
    count("movieId").alias("num_movies")
)

genre_analysis.orderBy("genre_avg_rating", ascending=False).show()

+-------+----------------+---------+
|movieId|           title|    genre|
+-------+----------------+---------+
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
+-------+----------------+---------+
only showing top 5 rows

+------------------+------------------+----------+
|             genre|  genre_avg_rating|num_movies|
+------------------+------------------+----------+
|         Film-Noir| 3.444150839287358|       322|
|       Documentary| 3.436664522206703|      2391|
|               War|3.3211370431752743|      1173|
|              IMAX| 3.294670404912528|       195|
|             Drama| 3.262038065956428|     13062|
|           Romance|3.2057429873673713|      4029|
|           Musical| 3.182132878495173|      1016|
|         Animation| 3.177095212510683|      1015|
|             Crime| 3.167141212246113|      2889|
|           Mystery|3.13507386

In [6]:
# Aggregate ratings by user
user_analysis = ratings_df.groupBy("userId").agg(
    count("rating").alias("num_ratings"),
    avg("rating").alias("avg_rating")
).orderBy("num_ratings", ascending=False)

user_analysis.show(10)


+------+-----------+------------------+
|userId|num_ratings|        avg_rating|
+------+-----------+------------------+
|118205|       9254|3.2790685109141995|
|  8405|       7515|3.2083166999334662|
| 82418|       5646| 3.516914629826426|
|121535|       5520|2.7931159420289857|
|125794|       5491| 3.762975778546713|
| 74142|       5447|1.5774738388103544|
| 34576|       5356| 3.011669156086632|
|131904|       5330| 3.248874296435272|
| 83090|       5169|2.4049139098471657|
| 59477|       4988|2.4550922213311948|
+------+-----------+------------------+
only showing top 10 rows



In [7]:
# Group by genre to compute overall statistics
genre_analysis = movies_with_ratings.groupBy("genre").agg(
    avg("avg_rating").alias("genre_avg_rating"),
    count("movieId").alias("num_movies")
).orderBy("genre_avg_rating", ascending=False)

# Show the results of the genre analysis
genre_analysis.show(truncate=False)


+------------------+------------------+----------+
|genre             |genre_avg_rating  |num_movies|
+------------------+------------------+----------+
|Film-Noir         |3.444150839287358 |322       |
|Documentary       |3.436664522206703 |2391      |
|War               |3.3211370431752743|1173      |
|IMAX              |3.294670404912528 |195       |
|Drama             |3.262038065956428 |13062     |
|Romance           |3.2057429873673713|4029      |
|Musical           |3.182132878495173 |1016      |
|Animation         |3.177095212510683 |1015      |
|Crime             |3.167141212246113 |2889      |
|Mystery           |3.1350738677584093|1489      |
|Fantasy           |3.093324230790383 |1398      |
|Western           |3.075228316860353 |656       |
|Comedy            |3.0748694649959822|8232      |
|Adventure         |3.072407494188218 |2287      |
|Thriller          |3.0160951673247123|4129      |
|Action            |2.9768762894084144|3466      |
|Children          |2.955607569

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when

# Step 1: Extraction
# -------------------
# Initialize Spark session
spark = SparkSession.builder \
    .appName("ETL Pipeline Project") \
    .getOrCreate()

# Read raw data from a CSV file (adjust the path as necessary)
raw_data = spark.read.option("header", True) \
    .option("inferSchema", True) \
    .csv(r"C:\Users\doyeo\Desktop\SPARK\Movie\data\genome_scores.csv")

# Preview the raw data
print("Raw Data Sample:")
raw_data.show(5)


Raw Data Sample:
+-------+-----+---------+
|movieId|tagId|relevance|
+-------+-----+---------+
|      1|    1|    0.025|
|      1|    2|    0.025|
|      1|    3|  0.05775|
|      1|    4|  0.09675|
|      1|    5|  0.14675|
+-------+-----+---------+
only showing top 5 rows



In [None]:

# Step 2: Transformation
# ----------------------
# Data Cleaning:
# - Remove records with null values in critical columns (e.g., 'id' or 'date')
# - Filter out records with invalid or negative values in a numeric column (e.g., 'amount')
cleaned_data = raw_data.filter(col("relevance").isNotNull()) \
    .filter(col("relevance") >= 0.03)

# Data Conversion:
# - Convert a string date column into a proper date type (adjust the date format as needed)
cleaned_data = cleaned_data.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# - Cast numeric columns to proper data types if necessary
cleaned_data = cleaned_data.withColumn("amount", col("amount").cast("double"))

# Additional Transformation:
# - Rename columns for consistency and clarity
transformed_data = cleaned_data.withColumnRenamed("old_name", "new_name")
# - Create new derived columns (example: flag large amounts)
transformed_data = transformed_data.withColumn("large_amount_flag", 
                                               when(col("amount") > 1000, 1).otherwise(0))

# Preview the transformed data
print("Transformed Data Sample:")
transformed_data.show(5)

# Step 3: Loading
# ---------------
# Write the transformed data to a target location in Parquet format
output_path = "output/cleaned_data.parquet"
transformed_data.write.mode("overwrite").parquet(output_path)

print(f"Data successfully written to {output_path}")

# Stop the Spark session once done
spark.stop()
