# Data Extraction from API using Pyspark

This PySpark script fetches movie data from The Movie Database (TMDB) API, processes it, and creates a structured DataFrame for analysis. The pipeline extracts movie details incluigin credits information for a predefined set of popular movies.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import requests
import json

# Initializing Spark Session
spark = SparkSession.builder \
    .appName("TMDB Movie Analysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

api_key = "Hidden"
base_url = "https://api.themoviedb.org/3/movie/"

headers = {
    "accept": "application/json",
    "Authorization": "Bearer Hidden"
}

# movie IDs to fetch
movie_ids = [299534, 19995, 140607, 299536, 597, 135397,
             420818, 24428, 168259, 99861, 284054, 12445,
             181808, 330457, 351286, 109445, 321612, 260513]


# Defining the schema for the movie data
movie_schema = StructType([
    StructField("adult", BooleanType(), True),
    StructField("backdrop_path", StringType(), True),
    StructField("belongs_to_collection", StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("poster_path", StringType(), True),
        StructField("backdrop_path", StringType(), True)
    ]), True),
    StructField("budget", LongType(), True),
    StructField("genres", ArrayType(StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])), True),
    StructField("homepage", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("imdb_id", StringType(), True),
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("poster_path", StringType(), True),
    StructField("production_companies", ArrayType(StructType([
        StructField("id", IntegerType(), True),
        StructField("logo_path", StringType(), True),
        StructField("name", StringType(), True),
        StructField("origin_country", StringType(), True)
    ])), True),
    StructField("production_countries", ArrayType(StructType([
        StructField("iso_3166_1", StringType(), True),
        StructField("name", StringType(), True)
    ])), True),
    StructField("release_date", StringType(), True),
    StructField("revenue", LongType(), True),
    StructField("runtime", IntegerType(), True),
    StructField("spoken_languages", ArrayType(StructType([
        StructField("english_name", StringType(), True),
        StructField("iso_639_1", StringType(), True),
        StructField("name", StringType(), True)

    ])), True),
    StructField("status", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("title", StringType(), True),
    StructField("video", BooleanType(), True),
    StructField("vote_average", DoubleType(), True),
    StructField("vote_count", IntegerType(), True),
    StructField("credits", StructType([
        StructField("cast", ArrayType(StructType([

            # Adding fields for cast members
            StructField("adult", BooleanType(), True),
            StructField("gender", IntegerType(), True),
            StructField("id", IntegerType(), True),
            StructField("known_for_department", StringType(), True),
            StructField("name", StringType(), True),
            StructField("original_name", StringType(), True),
            StructField("popularity", DoubleType(), True),
            StructField("profile_path", StringType(), True),
            StructField("cast_id", IntegerType(), True),
            StructField("character", StringType(), True),
            StructField("credit_id", StringType(), True),
            StructField("order", IntegerType(), True)

        ])), True),
        StructField("crew", ArrayType(StructType([
            # Adding fields for crew members
            StructField("adult", BooleanType(), True),
            StructField("gender", IntegerType(), True),
            StructField("id", IntegerType(), True),
            StructField("known_for_department", StringType(), True),
            StructField("name", StringType(), True),
            StructField("original_name", StringType(), True),
            StructField("popularity", DoubleType(), True),
            StructField("profile_path", StringType(), True),
            StructField("credit_id", StringType(), True),
            StructField("department", StringType(), True),
            StructField("job", StringType(), True)

        ])), True)
    ]), True)
])

# Defining a function to fetch movie data
def fetch_movie_data(movie_id):
    url = f"{base_url}{movie_id}?append_to_response=credits"
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch data for movie ID {movie_id}, status code: {response.status_code}")
        return None

# Fetching data for all movies
movie_data = []
for movie_id in movie_ids:
    data = fetch_movie_data(movie_id)
    if data:
        movie_data.append(data)

# Checking if movie_data is empty and provide a helpful message if it is
if not movie_data:
    print("No data was fetched from the API. Please check your movie IDs and API key.")
else:


    # Converting to a PySpark DataFrame
    data = spark.createDataFrame(movie_data, schema=movie_schema)
data.show()

+-----+--------------------+---------------------+---------+--------------------+--------------------+------+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+--------------------+
|adult|       backdrop_path|belongs_to_collection|   budget|              genres|            homepage|    id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|             credits|
+-----+--------------------+---------------------+---------+--------------------+--------------------+------+---------+-----------------+--------------------+--------------------+---

# Extracting and Cleaning JSON-like Columns

Dropping irrelevant columns

In [181]:
columns_to_drop = ['adult', 'imdb_id', 'original_title', 'video', 'homepage']
data = data.drop(*columns_to_drop)
data.show()

+--------------------+---------------------+---------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+--------------------+
|       backdrop_path|belongs_to_collection|   budget|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|             credits|
+--------------------+---------------------+---------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+-----------

Creating a copy of the dataset to avoid any changes that may be made to the original dataset

In [182]:
copy = data.alias("copied")
copy.show()

+--------------------+---------------------+---------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+--------------------+
|       backdrop_path|belongs_to_collection|   budget|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|             credits|
+--------------------+---------------------+---------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+-----------

Evaluating JSON-like columns

Extracting 'director' column from credits by filtering crew members whose job is 'Director' and extracting their names.

In [183]:
copy = copy.withColumn("director", expr("filter(credits.crew, x -> x.job = 'Director')[0].name"))

In [184]:
from pyspark.sql.functions import col, explode, concat_ws, transform, expr, size

# extracting data from JSON-like arrays
copy = copy.withColumn("genres", expr("concat_ws('|', transform(genres, x -> x.name))"))\
           .withColumn("spoken_languages", expr("concat_ws('|', transform(spoken_languages, x-> x.english_name))")) \
           .withColumn("production_countries", expr("concat_ws('|', transform(production_countries, x-> x.name))"))  \
           .withColumn("production_companies", expr("concat_ws('|', transform(production_companies, x-> x.name))"))  \
           .withColumn("cast_size", size("credits.cast")) \
           .withColumn("cast", expr("concat_ws('|', transform(credits.cast, x-> x.name))")) \
           .withColumn("crew_size", size("credits.crew"))\
           .drop("credits")

# Extracting collection name and handling nulls
copy = copy.withColumn("belongs_to_collection", col("belongs_to_collection.name"))

# Diplaying the updated DataFrame
copy.select("genres", "director", "spoken_languages", "production_countries", "belongs_to_collection", "production_companies","cast_size", "crew_size", "cast").show(truncate=False)

+-----------------------------------------+---------------+---------------------------------------------+---------------------------------------+-----------------------------------+----------------------------------------------------------------------------+---------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Inspecting some of the extracted JSON-like columns 

In [185]:
for column in ["genres", "spoken_languages", "production_countries", "production_companies", "belongs_to_collection"]:
    copy.select(column).groupBy(column).count().orderBy(desc("count")).show(truncate=False)

+-----------------------------------------+-----+
|genres                                   |count|
+-----------------------------------------+-----+
|Adventure|Action|Science Fiction         |3    |
|Action|Adventure|Science Fiction|Thriller|2    |
|Action|Adventure|Science Fiction         |2    |
|Action|Thriller|Crime                    |1    |
|Adventure|Science Fiction|Action         |1    |
|Adventure|Drama|Family|Animation         |1    |
|Action|Adventure|Fantasy|Science Fiction |1    |
|Drama|Romance                            |1    |
|Science Fiction|Action|Adventure         |1    |
|Family|Fantasy|Romance                   |1    |
|Fantasy|Adventure                        |1    |
|Family|Animation|Adventure|Comedy|Fantasy|1    |
|Animation|Family|Adventure|Fantasy       |1    |
|Action|Adventure|Animation|Family        |1    |
+-----------------------------------------+-----+

+---------------------------------------------+-----+
|spoken_languages                            



---



# Handling Missing and Incorrect Data

Converting 'budget' and 'revenue' values to million USD

In [186]:
copy = copy.withColumn("budget_musd", col("budget") / 1000000)
copy = copy.withColumn("revenue_musd", col("revenue") / 1000000)
copy = copy.drop("budget", "revenue")
copy.select("revenue_musd", "budget_musd", ).show(truncate=False)

+------------+-----------+
|revenue_musd|budget_musd|
+------------+-----------+
|2799.4391   |356.0      |
|2923.706026 |237.0      |
|2068.223624 |245.0      |
|2052.415039 |300.0      |
|2264.162353 |200.0      |
|1671.537444 |150.0      |
|1662.020819 |260.0      |
|1518.815515 |220.0      |
|1515.4      |190.0      |
|1405.403694 |365.0      |
|1349.926083 |200.0      |
|1341.511219 |125.0      |
|1332.69883  |200.0      |
|1453.683476 |150.0      |
|1310.466296 |170.0      |
|1274.219009 |150.0      |
|1266.115964 |160.0      |
|1242.805359 |200.0      |
+------------+-----------+



In [187]:
# Extracting the first version of the preprocessed dataset
copy.coalesce(1).write.mode("overwrite").csv("preprocessed_data_1")

Replacing unrealistic values in budget, revenue, and Runtime columns

In [188]:
copy = copy.withColumn("budget_musd", when(col("budget_musd") == 0, None).otherwise(col("budget_musd")))
copy = copy.withColumn("revenue_musd", when(col("revenue_musd") == 0, None).otherwise(col("revenue_musd")))
copy = copy.withColumn("runtime", when(col("runtime") == 0, None).otherwise(col("runtime")))
copy.show()

+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+--------+--------------------+--------------------+------------+----------+---------------+---------+--------------------+---------+-----------+------------+
|       backdrop_path|belongs_to_collection|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|       director|cast_size|                cast|crew_size|budget_musd|revenue_musd|
+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+---

Handling movies with 0 vote_count 

In [189]:
average_vote = copy.filter(col("vote_count") > 0).select(avg("vote_average")).collect()[0][0]
copy = copy.withColumn("vote_average", when(col("vote_count") == 0, average_vote).otherwise(col("vote_average")))
copy.show()

+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+--------+--------------------+--------------------+------------+----------+---------------+---------+--------------------+---------+-----------+------------+
|       backdrop_path|belongs_to_collection|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|       director|cast_size|                cast|crew_size|budget_musd|revenue_musd|
+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+---

Handling overview and tagline by replacing known placeholders with NaN values

In [190]:
copy = copy.withColumn("overview", when(col("overview").isin(["No Data", ""]), None).otherwise(col("overview")))
copy = copy.withColumn("tagline", when(col("tagline").isin(["No Data", ""]), None).otherwise(col("tagline"))
copy.show()

+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+--------+--------------------+--------------------+------------+----------+---------------+---------+--------------------+---------+-----------+------------+
|       backdrop_path|belongs_to_collection|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|       director|cast_size|                cast|crew_size|budget_musd|revenue_musd|
+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+---

Removing duplicates and dropping rows with unknown 'id' or 'title' columns

In [191]:
copy = copy.dropDuplicates()
copy = copy.dropna(subset=["id", "title"])
copy.show()

+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+--------+--------------------+--------------------+------------+----------+---------------+---------+--------------------+---------+-----------+------------+
|       backdrop_path|belongs_to_collection|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|       director|cast_size|                cast|crew_size|budget_musd|revenue_musd|
+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+---

Keeping only rows where at least 10 columns have non-NaN values by using only bolean operations (&) 

In [192]:
copy = copy.filter(
    (col("budget_musd").isNotNull()) &
    (col("revenue_musd").isNotNull()) &
    (col("runtime").isNotNull()) &
    (col("overview").isNotNull()) &
    (col("tagline").isNotNull()) &
    (col("belongs_to_collection").isNotNull()) &
    (col("genres").isNotNull()) &
    (col("spoken_languages").isNotNull()) &
    (col("production_countries").isNotNull()) &
    (col("production_companies").isNotNull()) &
    (col("popularity").isNotNull()) &
    (col("vote_average").isNotNull()) &
    (col("vote_count").isNotNull()) &
    (col("release_date").isNotNull()) &
    (col("poster_path").isNotNull()) &
    (col("cast").isNotNull())
)
copy.show()

+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+--------+--------------------+--------------------+------------+----------+---------------+---------+--------------------+---------+-----------+------------+
|       backdrop_path|belongs_to_collection|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|       director|cast_size|                cast|crew_size|budget_musd|revenue_musd|
+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+---

Filtering the dataframe to include only 'Released' movies, then dropping 'status' 

In [193]:
copy = copy.filter(col("status") == "Released").drop("status")
copy.show()

+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+--------------------+--------------------+------------+----------+---------------+---------+--------------------+---------+-----------+------------+
|       backdrop_path|belongs_to_collection|              genres|    id|original_language|            overview|popularity|         poster_path|production_companies|production_countries|release_date|runtime|    spoken_languages|             tagline|               title|vote_average|vote_count|       director|cast_size|                cast|crew_size|budget_musd|revenue_musd|
+--------------------+---------------------+--------------------+------+-----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+--------------------+--------------------+

Reordering columns, and finalizing dataframe by resetting the columns' indexes.

In [194]:
reordered_columns = ['id', 'title', 'tagline', 'release_date', 'genres', 'belongs_to_collection',
                     'original_language', 'budget_musd', 'revenue_musd', 'production_companies',
                     'production_countries', 'vote_count', 'vote_average', 'popularity', 'runtime',
                     'overview', 'spoken_languages', 'poster_path', 'cast', 'cast_size', 'director', 'crew_size']
copy = copy.select(reordered_columns)
copy.show()

+------+--------------------+--------------------+------------+--------------------+---------------------+-----------------+-----------+------------+--------------------+--------------------+----------+------------+----------+-------+--------------------+--------------------+--------------------+--------------------+---------+---------------+---------+
|    id|               title|             tagline|release_date|              genres|belongs_to_collection|original_language|budget_musd|revenue_musd|production_companies|production_countries|vote_count|vote_average|popularity|runtime|            overview|    spoken_languages|         poster_path|                cast|cast_size|       director|crew_size|
+------+--------------------+--------------------+------------+--------------------+---------------------+-----------------+-----------+------------+--------------------+--------------------+----------+------------+----------+-------+--------------------+--------------------+--------------

Exporting the final preprocessed dataframe to a downloadable CSV format.

In [195]:
copy.coalesce(1).write.mode("overwrite").csv("final_data")