In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Initialize Spark Session
spark = SparkSession.builder.appName("movies").getOrCreate()

# Load CSV file into DataFrame
df = spark.read.option("header", "true").csv("C://Users//trebo//Desktop//BDAData//movies.csv")

# Show the first few rows
df.show(10)

In [None]:
# Basic Operations:
# Data Filtering
# Filter movies that are 2000 onwards and has a score of greater than or equal to 8
df_filtered = df.filter((df["year"] >= 2000) & (df["score"] >= 8.0))
df_filtered.show(10) 



In [None]:
# Selecting Columns
# Selected columns are movie's respective name, its genre, lead star, head director and when
# it was published.
df_selected = df_filtered.select("name", "genre", "star", "director", "year")
df_selected.show(10)

In [None]:
# Data Cleaning
# 1. Remove rows where budget is null
filtered_cleaning_bud = df_filtered.dropna(subset=["budget"])

# Show the cleaned dataset
filtered_cleaning_bud.show(5)

In [None]:
# 2. Replace null or "Unrated" ratings with "Not Rated"
filtered_cleaning_rat = filtered_cleaning_bud.withColumn(
    "rating", when((col("rating").isNull()) | (col("rating") == "Unrated"), "Not Rated").otherwise(col("rating"))
)

# Show the cleaned dataset
filtered_cleaning_rat.show(5)

In [None]:
# 3. Remove rows where company is null
filtered_cleaning_com = filtered_cleaning_rat.dropna(subset=["company"])

# Show the cleaned dataset
filtered_cleaning_com.show(5)

In [None]:
# 4. Remove rows where gros score is null
filtered_cleaning = filtered_cleaning_com.dropna(subset=["gross"])

# Show the cleaned dataset
filtered_cleaning.show(5)


In [None]:
# SQL Queries
# Register DataFrame as a Temporary Table
filtered_cleaning.createOrReplaceTempView("movies_query")

# 1. Shows top 5 movies based on genre count
query = """
SELECT genre, COUNT(*) as count 
FROM movies_query
GROUP BY genre 
ORDER BY count DESC 
LIMIT 5
"""
top_genres = spark.sql(query)
top_genres.show()


In [None]:
# 2. Shows top 5 movies based on score
query = """
SELECT name, score 
FROM movies_query 
ORDER BY score DESC 
LIMIT 5
"""
top_movies = spark.sql(query)
top_movies.show()

In [None]:
# 3. Shows genre listed on the clean dataset 
query = """
SELECT DISTINCT genre
FROM movies_query
ORDER BY genre;
"""

unique_genres = spark.sql(query)
unique_genres.show()


In [None]:
# 4. Shows top 3 movies for each genre based on gross score
query = """
WITH GrossRanked AS (
    SELECT 
        genre, 
        name, 
        gross,
        ROW_NUMBER() OVER (PARTITION BY genre ORDER BY gross DESC) AS row_num
    FROM movies_query
)
SELECT genre, name, gross
FROM GrossRanked
WHERE row_num <= 3
ORDER BY genre, gross DESC;
"""

top3_grossing_per_genre = spark.sql(query)
top3_grossing_per_genre.show(50)  # Show first 50 rows


In [None]:
# PySpark SQL for Operations
# Data Filtering
# Filters movies that are released on 2000 onwards and has a score of 8+
filtered_movies = spark.sql("""
    SELECT * 
    FROM movies_table
    WHERE year >= 2000 AND score >= 8
""")

filtered_movies.show(5)  # Show first 10 rows


In [None]:
# Data Grouping
# Groups movies by genre and counts how many movies are in each genre
genre_count = spark.sql("""
    SELECT genre, COUNT(*) AS movie_count
    FROM movies_query
    GROUP BY genre
    ORDER BY movie_count DESC
""")

genre_count.show()

In [None]:
# Dataframe results output:

# Data to CSV File
# Convert PySpark DataFrame to Pandas
cleaned_df = filtered_cleaning.toPandas()
cleaned_df.to_csv("C:/Users/trebo/Desktop/BDAData/output/csv_output.csv", index=False)

print("File saved successfully as csv_output.csv")

In [None]:
# Data to JSON File
# Convert PySpark DataFrame to Pandas
cleaned_df = filtered_cleaning.toPandas()

# Save as a single JSON file
cleaned_df.to_json("C:/Users/trebo/Desktop/BDAData/output/json_output.json", 
                   orient="records", indent=4)

print("File saved successfully as json_output.csv")


In [None]:
# Data to txt file
# Convert PySpark DataFrame to Pandas
cleaned_df = filtered_cleaning.toPandas()

# Format as a pretty table and save to TXT
with open("C:/Users/trebo/Desktop/BDAData/output/txt_output.txt", "w", encoding="utf-8") as f:
    f.write(cleaned_df.to_string(index=False))

print("File saved successfully as txt_output.csv")