In [0]:
from pyspark.sql.functions import abs
from pyspark.sql.types import StringType, DoubleType, StructField, StructType, TimestampType, ArrayType, LongType
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, col, desc, explode, when, trim
from pyspark.sql.window import Window
from pyspark.sql.functions import to_timestamp

In [0]:
# Combine and load multiple JSON files
combined_df = None
for i in range(0, 8):
    file_path = f"dbfs:/FileStore/movie_{i}.json"
    df = spark.read.option("multiline", "true").option("inferSchema", "true").json(file_path)
    if combined_df is None:
        combined_df = df
    else:
        combined_df = combined_df.union(df)

combined_df.write.format("delta").mode("overwrite").saveAsTable("combined_movies")

In [0]:
# Combine and load multiple JSON files
combined_df = None
for i in range(0, 8):
    file_path = f"dbfs:/FileStore/movie_{i}.json"
    df = spark.read.option("multiline", "true").option("inferSchema", "true").json(file_path)
    if combined_df is None:
        combined_df = df
    else:
        combined_df = combined_df.union(df)

# Run JSON files to bronze tables
movies_df = combined_df.select(explode(combined_df['movie']).alias("movie_column"))
movies_df = movies_df.selectExpr("movie_column.*")
movies_df.write.format("delta").mode("overwrite").saveAsTable(f"bronze_movies")

In [0]:
%sql

SELECT * FROM hive_metastore.default.bronze_movies LIMIT 1

BackdropUrl,Budget,CreatedBy,CreatedDate,Id,ImdbUrl,OriginalLanguage,Overview,PosterUrl,Price,ReleaseDate,Revenue,RunTime,Tagline,Title,TmdbUrl,UpdatedBy,UpdatedDate,genres
https://image.tmdb.org/t/p/original//s3TBrRGB1iav7gFOCNx3H31MoES.jpg,160000000.0,,2021-04-03T16:51:30.1633333,1,https://www.imdb.com/title/tt1375666,en,"Cobb, a skilled thief who commits corporate espionage by infiltrating the subconscious of his targets is offered a chance to regain his old life as payment for a task considered to be impossible: ""inception"", the implantation of another person's idea into a target's subconscious.",https://image.tmdb.org/t/p/w342//9gk7adHYeDvHkCSEqAvQNLV5Uge.jpg,9.9,2010-07-15T00:00:00,825532764.0,148,Your mind is the scene of the crime.,Inception,https://www.themoviedb.org/movie/27205,,,"List(List(1, Adventure), List(6, Action), List(13, Science Fiction))"


In [0]:
# Data cleaning about negative runtime
bronze_df = spark.table("bronze_movies")
bronze_df = bronze_df.withColumn('RunTime', abs(bronze_df['RunTime']))

In [0]:
# Create a lookup junction table with only the 'genres' column from bronze_df
windowSpec = Window.orderBy("genres")
genres_df = bronze_df.select('genres').distinct().withColumnRenamed('genres', 'genre')

# Show the lookup junction table
genres_df.show()

# Save genres_df as a silver table
genres_df.write.format("delta").mode("overwrite").saveAsTable("silver_genres")

+--------------------+
|               genre|
+--------------------+
|[{1, Adventure}, ...|
|[{2, Fantasy}, {7...|
|[{1, Adventure}, ...|
|[{6, Action}, {7,...|
|[{5, Horror}, {6,...|
|[{1, Adventure}, ...|
|[{4, Drama}, {7, ...|
|[{1, Adventure}, ...|
|[{4, Drama}, {8, ...|
|[{1, Adventure}, ...|
|[{1, Adventure}, ...|
|[{2, Fantasy}, {7...|
|        [{4, Drama}]|
|[{1, Adventure}, ...|
|[{1, Adventure}, ...|
|[{4, Drama}, {5, ...|
|[{1, Adventure}, ...|
|[{1, Adventure}, ...|
|[{1, Adventure}, ...|
|[{10, Thriller}, ...|
+--------------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import lit

bronze_df = bronze_df.dropDuplicates(['Id'])
bronze_df = bronze_df.withColumn('status', lit('new'))

In [0]:
# Move bronze_df to silver table
bronze_df.write.format("delta").mode("overwrite").saveAsTable("silver_movie")

# Get column names from bronze_df
column_names = bronze_df.columns

# Create a comma-separated string of column names
column_names_expr = ", ".join(column_names)

# Create a new silver table using the column names from bronze_df
spark.sql(f"CREATE TABLE IF NOT EXISTS silver_movie_0 USING DELTA AS SELECT {column_names_expr} FROM silver_movie")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Show silver table silver_movie
spark.sql("SELECT * FROM silver_movie").show()

+--------------------+---------+---------+--------------------+---+--------------------+----------------+--------------------+--------------------+-----+-------------------+-------------+-------+--------------------+--------------------+--------------------+---------+-----------+--------------------+------+
|         BackdropUrl|   Budget|CreatedBy|         CreatedDate| Id|             ImdbUrl|OriginalLanguage|            Overview|           PosterUrl|Price|        ReleaseDate|      Revenue|RunTime|             Tagline|               Title|             TmdbUrl|UpdatedBy|UpdatedDate|              genres|status|
+--------------------+---------+---------+--------------------+---+--------------------+----------------+--------------------+--------------------+-----+-------------------+-------------+-------+--------------------+--------------------+--------------------+---------+-----------+--------------------+------+
|https://image.tmd...|   2.37E8|     NULL|2021-04-03T16:51:...|  6|https: