In [1]:
# Creating spark session
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import explode
import os
spark=(
    SparkSession.builder
    .master('local[3]')
    .appName('transformApp')
    .config('spark.jars','postgresql-42.7.4.jar')
    .config('spark.driver.extraClassPath','postgresql-42.6.2.jar')
    .getOrCreate()
)



# Loading the data into spark DataFrames
df_boxoffice=spark.read.json('collection_parsed_2024116.json',multiLine=True)
df_movie_raw=spark.read.json('movies_parsed_2024116.json',multiLine=True)
df_filmmaker_raw=spark.read.json('filmmaker_parsed_2024116.json',multiLine=True)
df_genre_raw=spark.read.json('genre_parsed_2024116.json',multiLine=True)
df_distributor_raw=spark.read.json('distributor_parsed_2024116.json',multiLine=True)



# Shaping the data in desired format from raw DataFrames
df_movie=(
    df_movie_raw.select(explode(df_movie_raw.content).alias('movies'),'extracted_at','run_date')
        .select('movies.*','extracted_at','run_date')       
)

df_genre=(
    df_genre_raw.select(explode(df_genre_raw.content).alias('genre'),'extracted_at','run_date')
        .select('genre.*','extracted_at','run_date')      
)

df_filmmaker=(
    df_filmmaker_raw.select(explode(df_filmmaker_raw.content).alias('filmmaker'),'extracted_at','run_date')
        .select('filmmaker.*','extracted_at','run_date')       
)

df_distributor=(
    df_distributor_raw.select(explode(df_distributor_raw.content).alias('distributor'),'extracted_at','run_date')
        .select('distributor.*','extracted_at','run_date')       
)




# Bridge table b/w movies and genre dimension
bridge_movie_genre=df_genre.select('genre_id','movie_id')

# Bridge table b/w movies and filmmaker dimension
bridge_filmmaker_movie=(
    df_movie.withColumn('filmmaker',explode(df_movie['filmmakers']))
        .select('filmmaker','movie_id')
        .select('filmmaker.filmmaker_id','filmmaker.role','movie_id')     
)




# Removing the duplicate values from dim_genre and dim_filmmaker which got created due to the many to many relationship b/w (genre,filmmaker) and movies
# Also dropping the movie_id as it does not match the dimension model schema for the corresponding dims
df_genre=(
    df_genre.dropDuplicates(['genre_id'])
            .drop('movie_id')
)
df_filmmaker=(
    df_filmmaker.dropDuplicates(['filmmaker_id'])
                .drop('movie_id')
)


# Adding the movie_id field in the df_boxoffice DataFrame using the df_movie DataFrame
df_boxoffice=df_boxoffice.join(other=df_movie.select(['release_id','movie_id']),on='release_id',how='left')


# Cleaning fields to remove "$" and "," charachters from the fields
from pyspark.sql.functions import regexp_replace
df_boxoffice=(
    df_boxoffice.withColumns({
        "collection_domestic":regexp_replace('collection_domestic',r'[\$\,]',''),
        "days_post_release":regexp_replace('days_post_release',r'[\,]',''),
        "num_theatres":regexp_replace('num_theatres',r'[\,]','')
    })
)

df_distributor=(
    df_distributor.withColumn("total_movies",regexp_replace("total_movies",r'\,',''))
)


# Adding calculated field "post_release_days" to the df_movie DataFrame
from pyspark.sql.functions import to_date,date_diff,current_date
df_movie=df_movie.withColumn('post_release_days',
                    date_diff(
                        start=to_date('release_date_id',
                        format="yyyyMMdd"),
                        end=current_date()
                    )
)

# Casting fields of all DataFrames to appropriate data types
df_movie=(
    df_movie.withColumns({
    "release_date_id":df_movie.release_date_id.cast(IntegerType()),
    "duration":df_movie.duration.cast(IntegerType()),
    "widest_release":df_movie.widest_release.cast(IntegerType()),
    "imdb_rating":df_movie.imdb_rating.cast(FloatType()),
    "num_of_rating":df_movie.num_of_rating.cast(IntegerType()),
    "extracted_at":df_movie.extracted_at.cast(IntegerType()),
    "run_date":df_movie.run_date.cast(IntegerType())
    })
)

df_genre=(
    df_genre.withColumns({
        "total_movies":df_genre.total_movies.cast(IntegerType()),
        "extracted_at":df_genre.extracted_at.cast(IntegerType()),
        "run_date":df_genre.run_date.cast(IntegerType())
    })
)

df_filmmaker=(
    df_filmmaker.withColumns({
        "dob":df_filmmaker.dob.cast(DateType()),
        "total_movies":df_filmmaker.total_movies.cast(IntegerType()),
        "extracted_at":df_filmmaker.extracted_at.cast(IntegerType()),
        "run_date":df_filmmaker.run_date.cast(IntegerType())
    })
)

df_distributor=(
    df_distributor.withColumns({
        "total_movies":df_distributor.total_movies.cast(IntegerType()),
        "extracted_at":df_distributor.extracted_at.cast(IntegerType()),
        "run_date":df_distributor.run_date.cast(IntegerType())
    })
)

df_boxoffice=(
    df_boxoffice.withColumns({
        "run_date":df_boxoffice.run_date.cast(IntegerType()),
        "collection_domestic":df_boxoffice.collection_domestic.cast(FloatType()),
        "days_post_release":df_boxoffice.days_post_release.cast(IntegerType()),
        "num_of_releases":df_boxoffice.num_of_releases.cast(IntegerType()),
        "rank":df_boxoffice.rank.cast(IntegerType()),
        "num_theatres":df_boxoffice.num_theatres.cast(IntegerType()),
        "extracted_at":df_boxoffice.extracted_at.cast(IntegerType())
    })
)


# Adding the field "age" to DataFrame "df_filmmaker"
df_filmmaker=df_filmmaker.withColumn('age',(date_diff(end=current_date(),start='dob')/365).cast(IntegerType()))

# Dropping the "filmmakers" and "genres" fields as the bridge table are created.
df_movie=df_movie.drop('filmmakers','genres')

# Similar to the above step, dropping the movie_id field from df_distributor
df_distributor=df_distributor.drop('movie_id')

df_boxoffice=df_boxoffice.drop('movie_name')

24/11/25 22:45:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [3]:
spark.stop()

In [None]:
# Loading the data to postgres
url='jdbc:postgresql://localhost:5432/movie_data?user=jayesh'

query='''
    SELECT * FROM dim_movies
'''
(
    spark.read
            .format('jdbc')
            .option('url',url)
            .option('query',query)
            .load()
).show()
# df_movie.write.jdbc(url=url,table='dim_movies',mode='append')
# df_genre.write.jdbc(url=url,table='dim_genre',mode='append')
# df_filmmaker.write.jdbc(url=url,table='dim_filmmakers',mode='append')
# df_distributor.write.jdbc(url=url,table='dim_distributor',mode='append')
# bridge_filmmaker_movie.write.jdbc(url=url,table='bridge_movie_filmmaker',mode='append')
# bridge_movie_genre.write.jdbc(url=url,table='bridge_movie_genre',mode='append')
# df_boxoffice.write.jdbc(url=url,table='fact_domestic_collection',mode='append')

In [None]:
#TODO
    # 1. Add fields listed in the data model to the corresponding DataFrames
    # 2. Check for possible undesired values in every DataFrames' fields and fix it by replacing it with the desired value so everything can be casted properly.
