In [1]:
import requests
from pyspark.sql import SparkSession


In [2]:
import pyspark

In [3]:
spark = SparkSession.builder.appName("Movieanalysis").getOrCreate()

In [4]:
api_key = "0fecc2c4c89ffb94c6f4fed5e5cdad57"

In [5]:
movie_ids = [0, 299534, 19995, 140607, 299536, 597, 135397, 420818, 
             24428, 168259, 99861, 284054, 12445, 181808, 330457, 
             351286, 109445, 321612, 260513]


In [6]:
# Define the TMDB API endpoint
movie_endpoint = "https://api.themoviedb.org/3/movie"

# Create a list to store movie data
movie_responses = []

for movie_id in movie_ids:
    if movie_id == 0:
        continue
    url = f"{movie_endpoint}/{movie_id}"
    response = requests.get(url, params={"api_key": api_key, "append_to_response": "credits,"})
    if response.status_code == 200:
        movie_responses.append(response.json())
    else:
        print(f"Failed to fetch data for movie ID {movie_id}")

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(movie_responses)


In [7]:
spark_df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- budget: long (nullable = true)
 |-- credits: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- genres: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: long (valueContainsNull = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- origin_country: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 

In [8]:
# Drop columns that aren't useful for analysis
irrelevant_cols = ['adult', 'imdb_id', 'original_title', 'video', 'homepage']
spark_df = spark_df.drop(*irrelevant_cols)


In [9]:
from pyspark.sql.functions import udf, col, from_json, explode, lit
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

def extract_names(data):
    if isinstance(data, list):
        return "|".join([x.get('name', '') for x in data if 'name' in x])
    return None

extract_udf = udf(extract_names, StringType())

json_cols = [
    "belongs_to_collection", "genres", "spoken_languages", 
    "production_countries", "production_companies"
]

spark_df = spark_df.withColumn("belongs_to_collection", col("belongs_to_collection.name"))

for col_name in json_cols[1:]:  # Skip belongs_to_collection (handled above)
    spark_df = spark_df.withColumn(col_name, extract_udf(col(col_name)))


In [10]:
from pyspark.sql.functions import to_date, when

# Replace zero values in budget, revenue, runtime with nulls (missing)
spark_df = spark_df.withColumn("budget", when(col("budget") == 0, None).otherwise(col("budget")))
spark_df = spark_df.withColumn("revenue", when(col("revenue") == 0, None).otherwise(col("revenue")))
spark_df = spark_df.withColumn("runtime", when(col("runtime") == 0, None).otherwise(col("runtime")))

In [11]:
# Ensure numeric columns are cast to proper types
spark_df = spark_df.withColumn("budget", col("budget").cast("double"))
spark_df = spark_df.withColumn("revenue", col("revenue").cast("double"))
spark_df = spark_df.withColumn("popularity", col("popularity").cast("double"))
spark_df = spark_df.withColumn("vote_count", col("vote_count").cast("int"))
spark_df = spark_df.withColumn("vote_average", col("vote_average").cast("double"))


In [13]:
# Convert 'release_date' string to Spark DateType
spark_df = spark_df.withColumn("release_date", to_date(col("release_date"), "yyyy-MM-dd"))


In [14]:
# Replace placeholder text values with nulls
spark_df = spark_df.withColumn("tagline", when(col("tagline").isin("No Tagline", ""), None).otherwise(col("tagline")))
spark_df = spark_df.withColumn("overview", when(col("overview").isin("No Data", ""), None).otherwise(col("overview")))


In [15]:
from pyspark.sql.functions import col

# Drop rows with missing ID or title
spark_df = spark_df.filter(col("id").isNotNull() & col("title").isNotNull())

# Only keep 'Released' status movies, then drop 'status'
spark_df = spark_df.filter(col("status") == "Released").drop("status")

# Drop duplicates
spark_df = spark_df.dropDuplicates(["id"])


In [16]:
spark_df.printSchema()

root
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: long (nullable = true)
 |-- budget: double (nullable = true)
 |-- credits: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- genres: string (nullable = true)
 |-- id: long (nullable = true)
 |-- origin_country: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: long (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |

In [17]:
from pyspark.sql.functions import when

# Count number of non-null columns for each row
non_null_count_expr = sum(when(col(c).isNotNull(), 1).otherwise(0) for c in spark_df.columns)

# Keep only rows with at least 10 non-null values
spark_df = spark_df.filter(non_null_count_expr >= 10)


In [None]:
from pyspark.sql.functions import size, arrays_zip, explode

# Create UDFs to extract cast and crew information

# Function to extract director names
def extract_director(credits_data):
    if credits_data and 'crew' in credits_data:
        directors = [member.get('name', '') for member in credits_data['crew'] 
                    if member.get('job', '').lower() == 'director']
        return '|'.join(directors) if directors else None
    return None

# Function to get cast size
def get_cast_size(credits_data):
    if credits_data and 'cast' in credits_data:
        return len(credits_data['cast'])
    return 0

# Function to get crew size
def get_crew_size(credits_data):
    if credits_data and 'crew' in credits_data:
        return len(credits_data['crew'])
    return 0

# Create UDFs
director_udf = udf(extract_director, StringType())
cast_size_udf = udf(get_cast_size, StringType())
crew_size_udf = udf(get_crew_size, StringType())

# Apply transformations
spark_df = spark_df.withColumn("director", director_udf(col("credits"))) \
                  .withColumn("cast_size", cast_size_udf(col("credits"))) \
                  .withColumn("crew_size", crew_size_udf(col("credits")))

# Drop the credits column as it's no longer needed
spark_df = spark_df.drop("credits")

In [18]:
# Convert budget and revenue to millions of USD
spark_df = spark_df.withColumn("budget_musd", col("budget") / 1_000_000)
spark_df = spark_df.withColumn("revenue_musd", col("revenue") / 1_000_000)

# Define the final column order (based on the project spec)
final_columns = [
    'id', 'title', 'tagline', 'release_date', 'genres', 'belongs_to_collection',
    'original_language', 'budget_musd', 'revenue_musd', 'production_companies',
    'production_countries', 'vote_count', 'vote_average', 'popularity', 'runtime',
    'overview', 'spoken_languages', 'poster_path', 'cast', 'cast_size', 
    'director', 'crew_size'
]

# Select columns in the correct order (ignoring any missing ones to prevent errors)
spark_df = spark_df.select([c for c in final_columns if c in spark_df.columns])
