In [37]:
from pathlib import Path
import sys 
import pandas as pd
import json

sys.path.append(str(Path.cwd().parent))

from pyspark.sql import SparkSession
import os

# Path to log4j.properties
notebook_dir = Path().resolve()
log4j_path = log4j_path = notebook_dir.parent / "conf" / "log4j.properties"

spark = (
    SparkSession.builder
    .appName("MovieProject")
    .config("spark.driver.extraJavaOptions", f"-Dlog4j.configuration=file:{log4j_path}")
    .config("spark.executor.extraJavaOptions", f"-Dlog4j.configuration=file:{log4j_path}")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN") 

%reload_ext autoreload
%autoreload 2

import scripts.extraction.extractor_tmdb as tmdb
import scripts.transform.clean_data as cln
import scripts.transform.tmdb_data_preprocessor as dp
import scripts.transform.tmdb_schema as ts
# import analysis.kpi_analysis as kpi 
# import visualize.visualizations as viz

## Fetch all movies and store in a Database

In [31]:
movie_ids = [0, 299534, 19995, 140607, 299536, 597, 135397, 420818, 24428, 168259, 99861, 284054, 12445, 181808, 330457, 351286, 109445, 321612, 260513]

In [32]:
movies = tmdb.get_all_movies_by_ids(movie_ids)

2026-01-15 15:52:01,387 - INFO - extractor_tmdb - Fetching movie ID: 0
2026-01-15 15:52:02,041 - ERROR - extractor_tmdb - HTTP error on attempt 0: 404 Client Error: Not Found for url: https://api.themoviedb.org/3/movie/0?append_to_response=credits&api_key=d855a01d02babf081991d455eff39348&language=en-US&movie_id=0
2026-01-15 15:52:04,693 - ERROR - extractor_tmdb - HTTP error on attempt 1: 404 Client Error: Not Found for url: https://api.themoviedb.org/3/movie/0?append_to_response=credits&api_key=d855a01d02babf081991d455eff39348&language=en-US&movie_id=0
2026-01-15 15:52:07,354 - ERROR - extractor_tmdb - HTTP error on attempt 2: 404 Client Error: Not Found for url: https://api.themoviedb.org/3/movie/0?append_to_response=credits&api_key=d855a01d02babf081991d455eff39348&language=en-US&movie_id=0
2026-01-15 15:52:09,928 - ERROR - extractor_tmdb - HTTP error on attempt 3: 404 Client Error: Not Found for url: https://api.themoviedb.org/3/movie/0?append_to_response=credits&api_key=d855a01d02ba

In [38]:
output_path = Path("tmdb_raw_movies.jsonl")

with output_path.open("w", encoding="utf-8") as f:
    for movie in movies:
        if movie:
            json.dump(movie, f, ensure_ascii=False)
            f.write("\n")

print(f"Saved {len(movies)} movies to {output_path}")

Saved 18 movies to tmdb_raw_movies.jsonl


In [39]:
df = spark.read \
    .option("multiLine", "true") \
    .schema(ts.data_schema) \
    .json(str(output_path))

In [40]:
df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- poster_path: string (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- origin_country: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: array (nullable = tr

In [44]:
categorical_columns = ['belongs_to_collection', 'genres', 'production_countries', 'production_companies', 'spoken_languages', 'origin_country',"credits.cast", "credits.crew"]

In [45]:
df.select(*categorical_columns).show(3, truncate=100, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------
 belongs_to_collection | {86311, The Avengers Collection, /yFSIUVTCvgYrpalUktulvk3Gi5Y.jpg, /zuW6fOiusv4X9nnW3paHGfXcSll.jpg} 
 genres                | [{12, Adventure}, {878, Science Fiction}, {28, Action}]                                              
 production_countries  | [{US, United States of America}]                                                                     
 production_companies  | [{420, /hUzeosd33nzE5MCNsZxCGEKTXaQ.png, Marvel Studios, US}]                                        
 spoken_languages      | [{English, en, English}, {Japanese, ja, 日本語}, {Xhosa, xh, }]                                      
 origin_country        | [US]                                                                                                 
 cast                  | [{false, 2, 3223, Acting, Robert Downey Jr., Robert Downey Jr., 9.8254, /5qHNjhtjMD4YWH3U

In [41]:
df.limit(10).toPandas()

Unnamed: 0,adult,backdrop_path,belongs_to_collection,budget,genres,homepage,id,imdb_id,origin_country,original_language,...,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count,credits
0,False,/7RyHsO4yDXtBv1zUU3mTpHeQ0d5.jpg,"(86311, The Avengers Collection, /yFSIUVTCvgYr...",356000000,"[(12, Adventure), (878, Science Fiction), (28,...",https://www.marvel.com/movies/avengers-endgame,299534,tt4154796,[US],en,...,2799439100,181,"[(English, en, English), (Japanese, ja, 日本語), ...",Released,Avenge the fallen.,Avengers: Endgame,False,8.238,27125,"([(False, 2, 3223, Acting, Robert Downey Jr., ..."


In [7]:
df.columns

['adult',
 'backdrop_path',
 'belongs_to_collection',
 'budget',
 'credits',
 'genres',
 'homepage',
 'id',
 'imdb_id',
 'origin_country',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

## Drop irrelevant columns

In [8]:
cols_to_drop = ['adult', 'imdb_id', 'original_title', 'video', 'homepage', 	'success', 'status_code', 'status_message']
df = cln.drop_irrelevant_columns(df, cols_to_drop)

2026-01-15 14:09:51,607 - INFO - clean_data - Dropping columns: ['adult', 'imdb_id', 'original_title', 'video', 'homepage', 'success', 'status_code', 'status_message']


In [9]:
df.toPandas()

Unnamed: 0,backdrop_path,belongs_to_collection,budget,credits,genres,id,origin_country,original_language,overview,popularity,...,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count
0,/7RyHsO4yDXtBv1zUU3mTpHeQ0d5.jpg,"{'backdrop_path': None, 'name': None, 'id': 86...",356000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 12}, {'name': None, 'id'...",299534,[US],en,After the devastating events of Avengers: Infi...,20.3653,...,"[{'name': 'United States of America', 'iso_316...",2019-04-24,2799439100,181,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,Avenge the fallen.,Avengers: Endgame,8.238,27125
1,/vL5LR6WdxWPjLPFRLe133jXWsh5.jpg,"{'backdrop_path': None, 'name': None, 'id': 87...",237000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 28}, {'name': None, 'id'...",19995,[US],en,"In the 22nd century, a paraplegic Marine is di...",64.127,...,"[{'name': 'United States of America', 'iso_316...",2009-12-16,2923706026,162,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,Enter the world of Pandora.,Avatar,7.6,33240
2,/8BTsTfln4jlQrLXUBquXJ0ASQy9.jpg,"{'backdrop_path': None, 'name': None, 'id': 10...",245000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 12}, {'name': None, 'id'...",140607,[US],en,Thirty years after defeating the Galactic Empi...,10.594,...,"[{'name': 'United States of America', 'iso_316...",2015-12-15,2068223624,136,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,Every generation has a story.,Star Wars: The Force Awakens,7.254,20198
3,/mDfJG3LC3Dqb67AZ52x3Z0jU0uB.jpg,"{'backdrop_path': None, 'name': None, 'id': 86...",300000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 12}, {'name': None, 'id'...",299536,[US],en,As the Avengers and their allies have continue...,29.5184,...,"[{'name': 'United States of America', 'iso_316...",2018-04-25,2052415039,149,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,Destiny arrives all the same.,Avengers: Infinity War,8.235,31335
4,/xnHVX37XZEp33hhCbYlQFq7ux1J.jpg,,200000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 18}, {'name': None, 'id'...",597,[US],en,101-year-old Rose DeWitt Bukater tells the sto...,30.7388,...,"[{'name': 'United States of America', 'iso_316...",1997-12-18,2264162353,194,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,Nothing on earth could come between them.,Titanic,7.903,26659
5,/s5QfDFqRO6sjgPtKkjxD0WqXQef.jpg,"{'backdrop_path': None, 'name': None, 'id': 32...",150000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 28}, {'name': None, 'id'...",135397,[US],en,Twenty-two years after the events of Jurassic ...,10.1956,...,"[{'name': 'United States of America', 'iso_316...",2015-06-06,1671537444,124,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,The park is open.,Jurassic World,6.7,21216
6,/1TUg5pO1VZ4B0Q1amk3OlXvlpXV.jpg,"{'backdrop_path': None, 'name': None, 'id': 76...",260000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 12}, {'name': None, 'id'...",420818,[US],en,"Simba idolizes his father, King Mufasa, and ta...",9.8459,...,"[{'name': 'United States of America', 'iso_316...",2019-07-12,1662020819,118,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,The king has returned.,The Lion King,7.099,10602
7,/9BBTo63ANSmhC4e6r62OJFuK2GL.jpg,"{'backdrop_path': None, 'name': None, 'id': 86...",220000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 878}, {'name': None, 'id...",24428,[US],en,When an unexpected enemy emerges and threatens...,69.3044,...,"[{'name': 'United States of America', 'iso_316...",2012-04-25,1518815515,143,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,Some assembly required.,The Avengers,7.904,35071
8,/ehzI1mVcnHqB58NqPyQwpMqcVoz.jpg,"{'backdrop_path': None, 'name': None, 'id': 94...",190000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 28}, {'name': None, 'id'...",168259,[US],en,Deckard Shaw seeks revenge against Dominic Tor...,10.4935,...,"[{'name': 'United States of America', 'iso_316...",2015-04-01,1515400000,139,"[{'name': 'العربية', 'iso_639_1': 'ar', 'engli...",Released,Vengeance hits home.,Furious 7,7.219,11089
9,/kIBK5SKwgqIIuRKhhWrJn3XkbPq.jpg,"{'backdrop_path': None, 'name': None, 'id': 86...",235000000,"{'cast': [{'cast_id': None, 'character': None,...","[{'name': None, 'id': 28}, {'name': None, 'id'...",99861,[US],en,When Tony Stark tries to jumpstart a dormant p...,16.1802,...,"[{'name': 'United States of America', 'iso_316...",2015-04-22,1405403694,141,"[{'name': 'English', 'iso_639_1': 'en', 'engli...",Released,A new age has come.,Avengers: Age of Ultron,7.271,24002


In [10]:
df.columns

['backdrop_path',
 'belongs_to_collection',
 'budget',
 'credits',
 'genres',
 'id',
 'origin_country',
 'original_language',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'vote_average',
 'vote_count']

In [11]:

from pyspark.sql import functions as F

# Extract example for 'belongs_to_collection' (expecting a struct/map)
df.select("belongs_to_collection").filter(F.col("belongs_to_collection").isNotNull()).limit(1).show(truncate=False)

# Extract example for 'genres' (array of structs)
df.select("genres").filter(F.size("genres") > 0).limit(1).show(truncate=False, vertical=True)

# Extract example for 'production_countries' (array of structs)
df.select("production_countries").filter(F.size("production_countries") > 0).limit(1).show(truncate=False, vertical=True)

# Extract example for 'production_companies' (array of structs)
df.select("production_companies").filter(F.size("production_companies") > 0).limit(1).show(truncate=False, vertical=True)

# Extract example for 'spoken_languages' (array of structs)
df.select("spoken_languages").filter(F.size("spoken_languages") > 0).limit(1).show(truncate=False, vertical=True)

# Extract example for 'origin_country' (array of strings)
df.select("origin_country").filter(F.size("origin_country") > 0).limit(1).show(truncate=False)

# Extract example for 'credits' (struct with cast and crew arrays)
# df.select("credits").filter(F.col("credits").isNotNull()).limit(1).show(truncate=False, vertical=True)

+-----------------------------------------------------------------------+
|belongs_to_collection                                                  |
+-----------------------------------------------------------------------+
|{backdrop_path -> NULL, name -> NULL, id -> 86311, poster_path -> NULL}|
+-----------------------------------------------------------------------+

-RECORD 0---------------------------------------------------------------------------------
 genres | [{name -> NULL, id -> 12}, {name -> NULL, id -> 878}, {name -> NULL, id -> 28}] 

-RECORD 0----------------------------------------------------------------------
 production_countries | [{name -> United States of America, iso_3166_1 -> US}] 

-RECORD 0--------------------------------------------------------------------------------------
 production_companies | [{name -> NULL, id -> 420, logo_path -> NULL, origin_country -> NULL}] 

-RECORD 0-----------------------------------------------------------------------------------

## Evaluate JSON-like columns

In [16]:
categorical_columns = ['belongs_to_collection', 'genres', 'production_countries', 'production_companies', 'spoken_languages', 'origin_country','credits']

In [17]:
df.select(*categorical_columns).limit(10).toPandas()

Unnamed: 0,belongs_to_collection,genres,production_countries,production_companies,spoken_languages,origin_country,credits
0,"{'backdrop_path': None, 'name': None, 'id': 86...","[{'name': None, 'id': 12}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 420, 'logo_path': None, ...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
1,"{'backdrop_path': None, 'name': None, 'id': 87...","[{'name': None, 'id': 28}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 444, 'logo_path': None, ...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
2,"{'backdrop_path': None, 'name': None, 'id': 10...","[{'name': None, 'id': 12}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 1, 'logo_path': None, 'o...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
3,"{'backdrop_path': None, 'name': None, 'id': 86...","[{'name': None, 'id': 12}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 420, 'logo_path': None, ...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
4,,"[{'name': None, 'id': 18}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 4, 'logo_path': None, 'o...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
5,"{'backdrop_path': None, 'name': None, 'id': 32...","[{'name': None, 'id': 28}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 56, 'logo_path': None, '...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
6,"{'backdrop_path': None, 'name': None, 'id': 76...","[{'name': None, 'id': 12}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 2, 'logo_path': None, 'o...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
7,"{'backdrop_path': None, 'name': None, 'id': 86...","[{'name': None, 'id': 878}, {'name': None, 'id...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 420, 'logo_path': None, ...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
8,"{'backdrop_path': None, 'name': None, 'id': 94...","[{'name': None, 'id': 28}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 333, 'logo_path': None, ...","[{'name': 'العربية', 'iso_639_1': 'ar', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."
9,"{'backdrop_path': None, 'name': None, 'id': 86...","[{'name': None, 'id': 28}, {'name': None, 'id'...","[{'name': 'United States of America', 'iso_316...","[{'name': None, 'id': 420, 'logo_path': None, ...","[{'name': 'English', 'iso_639_1': 'en', 'engli...",[US],"{'cast': [{'cast_id': None, 'character': None,..."


## Extracting and cleaning json columns

In [14]:
df = cln.clean_movie_data(df)

AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "transform(belongs_to_collection, lambdafunction(x[name], x))" due to data type mismatch: Parameter 1 requires the "ARRAY" type, however "belongs_to_collection" has the type "MAP<STRING, BIGINT>".; line 1 pos 0;
'Project [backdrop_path#1, CASE WHEN isnull(belongs_to_collection#2) THEN null WHEN typeof(belongs_to_collection#2) LIKE array% THEN concat_ws(|, transform(belongs_to_collection#2, lambdafunction(lambda 'x[name], lambda 'x, false))) ELSE belongs_to_collection#2[name] END AS belongs_to_collection#235, budget#3L, credits#4, genres#5, id#7L, origin_country#9, original_language#10, overview#12, popularity#13, poster_path#14, production_companies#15, production_countries#16, release_date#17, revenue#18L, runtime#19L, spoken_languages#20, status#21, tagline#22, title#23, vote_average#25, vote_count#26L]
+- Project [backdrop_path#1, belongs_to_collection#2, budget#3L, credits#4, genres#5, id#7L, origin_country#9, original_language#10, overview#12, popularity#13, poster_path#14, production_companies#15, production_countries#16, release_date#17, revenue#18L, runtime#19L, spoken_languages#20, status#21, tagline#22, title#23, vote_average#25, vote_count#26L]
   +- LogicalRDD [adult#0, backdrop_path#1, belongs_to_collection#2, budget#3L, credits#4, genres#5, homepage#6, id#7L, imdb_id#8, origin_country#9, original_language#10, original_title#11, overview#12, popularity#13, poster_path#14, production_companies#15, production_countries#16, release_date#17, revenue#18L, runtime#19L, spoken_languages#20, status#21, tagline#22, title#23, ... 3 more fields], false


In [16]:
df.limit(10).toPandas()

Unnamed: 0,backdrop_path,belongs_to_collection,budget,genres,id,origin_country,original_language,overview,popularity,poster_path,...,spoken_languages,status,tagline,title,vote_average,vote_count,cast,cast_size,director,crew_size
0,/7RyHsO4yDXtBv1zUU3mTpHeQ0d5.jpg,,356000000.0,,299534.0,US,en,After the devastating events of Avengers: Infi...,20.3653,/bR8ISy1O9XQxqiy0fQFw2BX72RQ.jpg,...,,Released,Avenge the fallen.,Avengers: Endgame,8.238,27125.0,||||||||||||||||||||||||||||||||||||||||||||||...,107,,608
1,/vL5LR6WdxWPjLPFRLe133jXWsh5.jpg,,237000000.0,,19995.0,US,en,"In the 22nd century, a paraplegic Marine is di...",64.127,/gKY6q7SjCkAU6FqvqWybDYgUKIF.jpg,...,,Released,Enter the world of Pandora.,Avatar,7.6,33240.0,||||||||||||||||||||||||||||||||||||||||||||||...,67,,991
2,/8BTsTfln4jlQrLXUBquXJ0ASQy9.jpg,,245000000.0,,140607.0,US,en,Thirty years after defeating the Galactic Empi...,10.594,/wqnLdwVXoBjKibFRR5U3y0aDUhs.jpg,...,,Released,Every generation has a story.,Star Wars: The Force Awakens,7.254,20198.0,||||||||||||||||||||||||||||||||||||||||||||||...,183,,264
3,/mDfJG3LC3Dqb67AZ52x3Z0jU0uB.jpg,,300000000.0,,299536.0,US,en,As the Avengers and their allies have continue...,29.5184,/7WsyChQLEftFiDOVTGkv3hFpyyt.jpg,...,,Released,Destiny arrives all the same.,Avengers: Infinity War,8.235,31335.0,||||||||||||||||||||||||||||||||||||||||||||||...,69,,734
4,/xnHVX37XZEp33hhCbYlQFq7ux1J.jpg,,200000000.0,,597.0,US,en,101-year-old Rose DeWitt Bukater tells the sto...,30.7388,/9xjZS2rlVxm8SFx8kPC3aIGCOYQ.jpg,...,,Released,Nothing on earth could come between them.,Titanic,7.903,26659.0,||||||||||||||||||||||||||||||||||||||||||||||...,116,,262
5,/s5QfDFqRO6sjgPtKkjxD0WqXQef.jpg,,150000000.0,,135397.0,US,en,Twenty-two years after the events of Jurassic ...,10.1956,/rhr4y79GpxQF9IsfJItRXVaoGs4.jpg,...,,Released,The park is open.,Jurassic World,6.7,21216.0,||||||||||||||||||||||||||||||||||||||||||||||...,53,,428
6,/1TUg5pO1VZ4B0Q1amk3OlXvlpXV.jpg,,260000000.0,,420818.0,US,en,"Simba idolizes his father, King Mufasa, and ta...",9.8459,/dzBtMocZuJbjLOXvrl4zGYigDzh.jpg,...,,Released,The king has returned.,The Lion King,7.099,10602.0,|||||||||||||||||||,20,,50
7,/9BBTo63ANSmhC4e6r62OJFuK2GL.jpg,,220000000.0,,24428.0,US,en,When an unexpected enemy emerges and threatens...,69.3044,/RYMX2wcKCBAr24UyPD7xwmjaTn.jpg,...,,Released,Some assembly required.,The Avengers,7.904,35071.0,||||||||||||||||||||||||||||||||||||||||||||||...,113,,642
8,/ehzI1mVcnHqB58NqPyQwpMqcVoz.jpg,,190000000.0,,168259.0,US,en,Deckard Shaw seeks revenge against Dominic Tor...,10.4935,/ktofZ9Htrjiy0P6LEowsDaxd3Ri.jpg,...,,Released,Vengeance hits home.,Furious 7,7.219,11089.0,||||||||||||||||||||||||||||||||||||||||||||||||,49,,228
9,/kIBK5SKwgqIIuRKhhWrJn3XkbPq.jpg,,235000000.0,,99861.0,US,en,When Tony Stark tries to jumpstart a dormant p...,16.1802,/4ssDuvEDkSArWEdyBl2X5EHvYKU.jpg,...,,Released,A new age has come.,Avengers: Age of Ultron,7.271,24002.0,||||||||||||||||||||||||||||||||||||||||||||||...,74,,653


In [17]:
df.dtypes

[('backdrop_path', 'string'),
 ('belongs_to_collection', 'void'),
 ('budget', 'double'),
 ('genres', 'void'),
 ('id', 'double'),
 ('origin_country', 'string'),
 ('original_language', 'string'),
 ('overview', 'string'),
 ('popularity', 'double'),
 ('poster_path', 'string'),
 ('production_companies', 'void'),
 ('production_countries', 'array<map<string,string>>'),
 ('release_date', 'date'),
 ('revenue', 'double'),
 ('runtime', 'double'),
 ('spoken_languages', 'void'),
 ('status', 'string'),
 ('tagline', 'string'),
 ('title', 'string'),
 ('vote_average', 'double'),
 ('vote_count', 'double'),
 ('cast', 'string'),
 ('cast_size', 'int'),
 ('director', 'string'),
 ('crew_size', 'int')]

In [15]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import DataFrame

# ---------------------------------------------------------------------------
# 1) Quick schema overview
# ---------------------------------------------------------------------------
def show_schema_overview(df: DataFrame):
    """
    Prints column name, spark type (simpleString) and whether it's nested (Array/Map/Struct).
    """
    print("Column | SparkType | Nested?")
    for f in df.schema.fields:
        t = f.dataType
        nested = any(isinstance(t, ty) for ty in (T.ArrayType, T.MapType, T.StructType))
        print(f"{f.name} | {t.simpleString()} | {nested}")


# ---------------------------------------------------------------------------
# 2) Suggest recommended extractor (based on schema)
# ---------------------------------------------------------------------------
def suggest_extractors(df: DataFrame):
    """
    For each column, suggests whether to treat it as:
     - map -> use map extractor (col.getItem(key))
     - array<map> -> transform + concat_ws on x['key']
     - array<string> -> concat_ws
     - struct -> dot-access
     - scalar -> leave as-is
    Uses df.schema so analysis-time errors are avoided.
    """
    suggestions = {}
    for f in df.schema.fields:
        t = f.dataType
        if isinstance(t, T.MapType):
            suggestions[f.name] = "MAP -> use col.getItem(key) or explode(map_entries(col))"
        elif isinstance(t, T.ArrayType):
            if isinstance(t.elementType, T.MapType):
                suggestions[f.name] = "ARRAY<MAP> -> explode/transform then extract map keys (x['name'])"
            elif isinstance(t.elementType, T.StructType):
                suggestions[f.name] = "ARRAY<STRUCT> -> explode then access struct fields (e.g. e.name)"
            elif isinstance(t.elementType, T.StringType):
                suggestions[f.name] = "ARRAY<STRING> -> concat_ws(join_with, col)"
            else:
                suggestions[f.name] = f"ARRAY<{t.elementType.simpleString()}> -> inspect elements"
        elif isinstance(t, T.StructType):
            suggestions[f.name] = "STRUCT -> use dot access (col.field)"
        else:
            suggestions[f.name] = "SCALAR -> leave or cast"
    # print pretty
    for c, s in suggestions.items():
        print(f"{c:30} : {s}")
    return suggestions


# ---------------------------------------------------------------------------
# 3) Explore a MAP column: keys, key counts, sample values (safe, distributed)
# ---------------------------------------------------------------------------
def explore_map_column(df: DataFrame, col: str, top_n: int = 20):
    """
    - counts distinct keys in the map across dataset (explodes map_keys)
    - shows top-n keys by frequency
    - shows sample values for a requested key (by exploding map_entries)
    """
    print(f"Exploring MAP column '{col}'")
    # count rows where map is non-null
    non_null = df.filter(F.col(col).isNotNull()).count()
    print(f"Non-null rows: {non_null}")

    # top keys (explode map_keys)
    keys = (
        df
        .select(F.explode(F.map_keys(F.col(col))).alias("key"))
        .groupBy("key")
        .count()
        .orderBy(F.desc("count"))
    )
    print("Top keys (key, count):")
    keys.show(top_n, truncate=False)

    # sample map entries: key + typeof(value)
    entries = (
        df
        .select(F.explode(F.map_entries(F.col(col))).alias("entry"))
        .select(F.col("entry.key").alias("key"), F.col("entry.value").alias("value"))
        .withColumn("value_type", F.typeof(F.col("value")))
        .groupBy("key", "value_type")
        .agg(F.count("*").alias("count"))
        .orderBy(F.desc("count"))
    )
    print("Key + observed value types + counts:")
    entries.show(top_n, truncate=False)

    # show some example map rows (limit)
    print("Sample raw map (first 5 non-null rows):")
    df.select(col).where(F.col(col).isNotNull()).limit(5).show(truncate=False)


# ---------------------------------------------------------------------------
# 4) Explore an ARRAY<MAP> or ARRAY<STRUCT> column
# ---------------------------------------------------------------------------
def explore_array_column(df: DataFrame, col: str, top_n: int = 20):
    """
    Explode the array and inspect element shape:
    - element typeof (map/struct/string/...)
    - if map -> map_keys and map_entries summary
    - if struct -> struct fields and a sample row
    - safe for large datasets (uses limited aggregates)
    """
    print(f"Exploring ARRAY column '{col}'")
    non_null = df.filter(F.col(col).isNotNull()).count()
    print(f"Non-null rows: {non_null}")

    # explode and get typeof for elements
    exploded = df.select(F.explode(F.col(col)).alias("elem"))
    elem_type = exploded.select(F.typeof(F.col("elem")).alias("elem_type")).groupBy("elem_type").count().orderBy(F.desc("count"))
    print("Observed element types (elem_type, count):")
    elem_type.show(truncate=False)

    # if elements are maps -> show keys & types
    maps = exploded.filter(F.typeof(F.col("elem")) == F.lit("map")).select(F.map_keys(F.col("elem")).alias("keys"))
    if maps.rdd.isEmpty():
        print("No map elements observed (or unable to detect).")
    else:
        print("Top map keys across elements (exploded keys):")
        (maps.select(F.explode("keys").alias("k"))
             .groupBy("k")
             .count()
             .orderBy(F.desc("count"))
             .show(top_n, truncate=False))

        # map entries types per key
        map_entries = exploded.filter(F.typeof(F.col("elem")) == F.lit("map")).select(F.explode(F.map_entries(F.col("elem"))).alias("entry"))
        map_entries_sel = map_entries.select(
            F.col("entry.key").alias("key"),
            F.col("entry.value").alias("value"),
            F.typeof(F.col("entry.value")).alias("value_type")
        ).groupBy("key", "value_type").agg(F.count("*").alias("count")).orderBy(F.desc("count"))
        print("Element map entries: key, value_type, count")
        map_entries_sel.show(top_n, truncate=False)

    # if elements are struct -> show struct schema (take first non-null element)
    structs = exploded.filter(F.typeof(F.col("elem")) == F.lit("struct"))
    if not structs.rdd.isEmpty():
        print("Sample struct element (first non-null):")
        sample_struct = structs.limit(1).collect()
        # print full row as python dict for readability
        print(sample_struct)
    else:
        print("No struct elements observed (or unable to detect).")

    # show a few exploded elems
    print("Sample exploded elements (limit 10):")
    exploded.limit(10).show(truncate=False)


# ---------------------------------------------------------------------------
# 5) Utility: safe_apply_extractor_by_schema
#    - demonstrates how to apply transform/concat_ws only after schema check
# ---------------------------------------------------------------------------
def safe_apply_array_map_extractor(df: DataFrame, col: str, elem_key: str, out_col: str = None, join_with="|"):
    """
    Safely create out_col by extracting elem_key from array<map> only when schema says column is ArrayType(MapType).
    Uses df.schema (driver-side) to decide. Returns modified DataFrame.
    """
    out_col = out_col or col
    field = next((f for f in df.schema.fields if f.name == col), None)
    if field is None:
        raise ValueError(f"Column '{col}' not in dataframe")
    t = field.dataType
    if isinstance(t, T.ArrayType) and isinstance(t.elementType, T.MapType):
        return df.withColumn(out_col, F.concat_ws(join_with, F.expr(f"transform({col}, x -> x['{elem_key}'])")))
    else:
        raise TypeError(f"Column '{col}' is not ArrayType(MapType). Detected type: {t.simpleString()}")


# ---------------------------------------------------------------------------
# 6) Convenience: explore all nested columns in the df (small prints and safe sampling)
# ---------------------------------------------------------------------------
def explore_all_nested(df: DataFrame, sample_limit: int = 5):
    """
    Runs a series of safe inspections for all columns that are Map/Array/Struct.
    Use small sample_limit to avoid huge output.
    """
    print("=== SCHEMA OVERVIEW ===")
    show_schema_overview(df)
    print("\n=== SUGGESTED EXTRACTORS ===")
    suggest_extractors(df)

    # iterate and explore deeper only for nested columns
    for f in df.schema.fields:
        t = f.dataType
        try:
            if isinstance(t, T.MapType):
                print("\n" + "-"*40)
                explore_map_column(df.select(f.name).limit(sample_limit*20), f.name, top_n=10)
            elif isinstance(t, T.ArrayType):
                print("\n" + "-"*40)
                explore_array_column(df.select(f.name).limit(sample_limit*20), f.name, top_n=10)
            elif isinstance(t, T.StructType):
                print("\n" + "-"*40)
                print(f"STRUCT column '{f.name}' fields: {[c.name for c in t.fields]}")
                print("Sample struct rows:")
                df.select(f.name).where(F.col(f.name).isNotNull()).limit(sample_limit).show(truncate=False)
        except Exception as e:
            print(f"Error exploring {f.name}: {e}")


In [16]:
# 1) quick overview
show_schema_overview(df)

# 2) get extractor suggestions
suggest_extractors(df)

# 3) explore a specific map col (e.g. belongs_to_collection)
explore_map_column(df, "belongs_to_collection")

# 4) explore an array of maps (e.g. genres or production_companies)
explore_array_column(df, "genres")

# 5) safely apply an extractor only when schema matches
df2 = safe_apply_array_map_extractor(df, "genres", "name", out_col="genres_names")

# 6) run a full exploration of nested columns (be careful on very large datasets)
explore_all_nested(df, sample_limit=5)


Column | SparkType | Nested?
backdrop_path | string | False
belongs_to_collection | map<string,bigint> | True
budget | bigint | False
credits | map<string,array<map<string,boolean>>> | True
genres | array<map<string,bigint>> | True
id | bigint | False
origin_country | array<string> | True
original_language | string | False
overview | string | False
popularity | double | False
poster_path | string | False
production_companies | array<map<string,bigint>> | True
production_countries | array<map<string,string>> | True
release_date | string | False
revenue | bigint | False
runtime | bigint | False
spoken_languages | array<map<string,string>> | True
status | string | False
tagline | string | False
title | string | False
vote_average | double | False
vote_count | bigint | False
backdrop_path                  : SCALAR -> leave or cast
belongs_to_collection          : MAP -> use col.getItem(key) or explode(map_entries(col))
budget                         : SCALAR -> leave or cast
credits       

In [17]:
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.sql.types import *

# -------------------------
# 1️⃣ Create Spark session
# -------------------------
spark = SparkSession.builder \
    .appName("TMDB Movie Cleaner") \
    .getOrCreate()

# -------------------------
# 2️⃣ Define explicit schemas
# -------------------------

# id + name struct (for genres, companies, languages)
id_name_schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True)
])

# Collection schema
collection_schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("poster_path", StringType(), True),
    StructField("backdrop_path", StringType(), True)
])

# Cast / Crew person schema
credit_person_schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("job", StringType(), True),
    StructField("character", StringType(), True),
    StructField("adult", BooleanType(), True)
])

# Credits schema
credits_schema = StructType([
    StructField("cast", ArrayType(credit_person_schema), True),
    StructField("crew", ArrayType(credit_person_schema), True)
])

# -------------------------
# 3️⃣ Helper extraction functions
# -------------------------

def extract_array_map_field(df: DataFrame, col: str, key: str, join_with: str = "|") -> DataFrame:
    """Extract key from array<map> and join as string"""
    return df.withColumn(col, F.concat_ws(join_with, F.expr(f"transform({col}, x -> x.{key})")))

def extract_collection_field(df: DataFrame, col: str = "belongs_to_collection", key: str = "name") -> DataFrame:
    """Extract a field from belongs_to_collection"""
    return df.withColumn(col + "_" + key, F.col(col + "." + key))

def extract_credits(df: DataFrame) -> DataFrame:
    """Extract cast, cast_size, director, crew_size"""
    df = df.withColumn(
        "cast",
        F.concat_ws("|", F.expr("transform(credits.cast, x -> x.name)"))
    ).withColumn(
        "cast_size",
        F.size(F.col("credits.cast"))
    ).withColumn(
        "director",
        F.concat_ws(
            "|",
            F.expr("transform(filter(credits.crew, x -> x.job = 'Director'), x -> x.name)")
        )
    ).withColumn(
        "crew_size",
        F.size(F.col("credits.crew"))
    ).drop("credits")
    return df

def extract_origin_country(df: DataFrame, col: str = "origin_country", join_with: str = "|") -> DataFrame:
    """Join array<string> into single string"""
    return df.withColumn(col, F.concat_ws(join_with, F.col(col)))

# -------------------------
# 4️⃣ Main cleaning function
# -------------------------

def clean_movie_data(df: DataFrame) -> DataFrame:

    # --- Re-parse nested JSONs with explicit schemas ---
    df = df.withColumn("genres", F.from_json(F.to_json("genres"), ArrayType(id_name_schema)))
    df = df.withColumn("production_companies", F.from_json(F.to_json("production_companies"), ArrayType(id_name_schema)))
    df = df.withColumn("production_countries", F.from_json(F.to_json("production_countries"),
                                                           ArrayType(StructType([
                                                               StructField("iso_3166_1", StringType(), True),
                                                               StructField("name", StringType(), True)
                                                           ]))))
    df = df.withColumn("spoken_languages", F.from_json(F.to_json("spoken_languages"), ArrayType(id_name_schema)))
    df = df.withColumn("belongs_to_collection", F.from_json(F.to_json("belongs_to_collection"), collection_schema))
    df = df.withColumn("credits", F.from_json(F.to_json("credits"), credits_schema))

    # --- Extract collection name ---
    df = extract_collection_field(df, "belongs_to_collection", "name")

    # --- Extract genres, companies, countries, languages ---
    df = extract_array_map_field(df, "genres", "name")
    df = extract_array_map_field(df, "production_companies", "name")
    df = extract_array_map_field(df, "production_countries", "name")
    df = extract_array_map_field(df, "spoken_languages", "name")

    # --- Extract origin_country (array<string>) ---
    df = extract_origin_country(df, "origin_country")

    # --- Extract cast & crew ---
    df = extract_credits(df)

    # --- Convert numeric columns ---
    numeric_columns = ["budget", "popularity", "id", "revenue", "runtime", "vote_average", "vote_count"]
    for c in numeric_columns:
        df = df.withColumn(c, F.col(c).cast("double"))

    # --- Convert release_date to date ---
    df = df.withColumn("release_date", F.to_date("release_date", "yyyy-MM-dd"))

    return df

# -------------------------
# 5️⃣ Example usage
# -------------------------

# Suppose `movies` is a Python list of dicts from the API
df = spark.createDataFrame(movies)

# Cleaned DF
df_clean = clean_movie_data(df)

# Optional: show cleaned data
df_clean.toPandas()


                                                                                

Unnamed: 0,adult,backdrop_path,belongs_to_collection,budget,genres,homepage,id,imdb_id,origin_country,original_language,...,tagline,title,video,vote_average,vote_count,belongs_to_collection_name,cast,cast_size,director,crew_size
0,False,/7RyHsO4yDXtBv1zUU3mTpHeQ0d5.jpg,"(86311, None, None, None)",356000000.0,,https://www.marvel.com/movies/avengers-endgame,299534.0,tt4154796,US,en,...,Avenge the fallen.,Avengers: Endgame,False,8.238,27125.0,,,107,,608
1,False,/vL5LR6WdxWPjLPFRLe133jXWsh5.jpg,"(87096, None, None, None)",237000000.0,,https://www.avatar.com/movies/avatar,19995.0,tt0499549,US,en,...,Enter the world of Pandora.,Avatar,False,7.6,33240.0,,,67,,991
2,False,/8BTsTfln4jlQrLXUBquXJ0ASQy9.jpg,"(10, None, None, None)",245000000.0,,http://www.starwars.com/films/star-wars-episod...,140607.0,tt2488496,US,en,...,Every generation has a story.,Star Wars: The Force Awakens,False,7.254,20198.0,,,183,,264
3,False,/mDfJG3LC3Dqb67AZ52x3Z0jU0uB.jpg,"(86311, None, None, None)",300000000.0,,https://www.marvel.com/movies/avengers-infinit...,299536.0,tt4154756,US,en,...,Destiny arrives all the same.,Avengers: Infinity War,False,8.235,31335.0,,,69,,734
4,False,/xnHVX37XZEp33hhCbYlQFq7ux1J.jpg,,200000000.0,,https://www.paramountmovies.com/movies/titanic,597.0,tt0120338,US,en,...,Nothing on earth could come between them.,Titanic,False,7.903,26659.0,,,116,,262
5,False,/s5QfDFqRO6sjgPtKkjxD0WqXQef.jpg,"(328, None, None, None)",150000000.0,,https://www.jurassicworld.com/,135397.0,tt0369610,US,en,...,The park is open.,Jurassic World,False,6.7,21216.0,,,53,,428
6,False,/1TUg5pO1VZ4B0Q1amk3OlXvlpXV.jpg,"(762512, None, None, None)",260000000.0,,https://movies.disney.com/the-lion-king-2019,420818.0,tt6105098,US,en,...,The king has returned.,The Lion King,False,7.099,10602.0,,,20,,50
7,False,/9BBTo63ANSmhC4e6r62OJFuK2GL.jpg,"(86311, None, None, None)",220000000.0,,https://www.marvel.com/movies/the-avengers,24428.0,tt0848228,US,en,...,Some assembly required.,The Avengers,False,7.904,35071.0,,,113,,642
8,False,/ehzI1mVcnHqB58NqPyQwpMqcVoz.jpg,"(9485, None, None, None)",190000000.0,,https://www.uphe.com/movies/furious-7,168259.0,tt2820852,US,en,...,Vengeance hits home.,Furious 7,False,7.219,11089.0,,,49,,228
9,False,/kIBK5SKwgqIIuRKhhWrJn3XkbPq.jpg,"(86311, None, None, None)",235000000.0,,https://www.marvel.com/movies/avengers-age-of-...,99861.0,tt2395427,US,en,...,A new age has come.,Avengers: Age of Ultron,False,7.271,24002.0,,,74,,653
