
### Set-up

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import when
from pyspark.sql.functions import col, expr
from azure.storage.blob import BlobServiceClient

import os

In [0]:
from pyspark.sql.functions import input_file_name

# Define the path of the directory containing the JSON files
json_path = "wasbs://landing@db0418.blob.core.windows.net/"

# List all the JSON files in the directory
json_files = [file.path for file in dbutils.fs.ls(json_path) if file.path.endswith(".json")]



# Read each JSON file into a separate DataFrame
for file_path in json_files:
    # Extract the file name from the path
    file_name = os.path.splitext(os.path.basename(file_path))[0]

    # Read the JSON file into a DataFrame
    json_data = spark.read.option("multiline", "true").json(file_path)

    # Write the DataFrame as a Delta file
    delta_file_path = f"dbfs:/movies/{file_name}.delta"
    json_data.write.format("delta").mode("overwrite").save(delta_file_path)
    
    print(f"{file_name} has been persisted to Delta file: {delta_file_path}")
    


movie_0 has been persisted to Delta file: dbfs:/movies/movie_0.delta
movie_1 has been persisted to Delta file: dbfs:/movies/movie_1.delta
movie_2 has been persisted to Delta file: dbfs:/movies/movie_2.delta
movie_3 has been persisted to Delta file: dbfs:/movies/movie_3.delta
movie_4 has been persisted to Delta file: dbfs:/movies/movie_4.delta
movie_5 has been persisted to Delta file: dbfs:/movies/movie_5.delta
movie_6 has been persisted to Delta file: dbfs:/movies/movie_6.delta
movie_7 has been persisted to Delta file: dbfs:/movies/movie_7.delta


In [0]:
dbutils.fs.rm("dbfs:/movies/", recurse=True)

True

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, expr, col, upper, to_date, year, regexp_extract, substring_index

def process_movie_file(file_path):
    df = spark.read.format("delta").load(file_path)
    df = df.select(explode("movie").alias("movie"))
    df = df.selectExpr("movie.*")
    df_processed = df.withColumn("GenresList", expr("concat_ws(', ', transform(genres, x -> x.name))"))
    
    
    return df_processed

def process_all_movie_files(folder_path):
    all_files = dbutils.fs.ls(folder_path)
    df_all = None
    for file_row in all_files:
        file_path = file_row.path
        df_processed = process_movie_file(file_path)
        if df_all is None:
            df_all = df_processed
        else:
            df_all = df_all.union(df_processed)
    print(df_all.count())
    return df_all

# Replace 'dbfs:/movies' with your actual folder path
folder_path = "dbfs:/movies"
combined_df = process_all_movie_files(folder_path)


9995


In [0]:
display(combined_df)

BackdropUrl,Budget,CreatedBy,CreatedDate,Id,ImdbUrl,OriginalLanguage,Overview,PosterUrl,Price,ReleaseDate,Revenue,RunTime,Tagline,Title,TmdbUrl,UpdatedBy,UpdatedDate,genres,GenresList
https://image.tmdb.org/t/p/original//s3TBrRGB1iav7gFOCNx3H31MoES.jpg,160000000.0,,2021-04-03T16:51:30.1633333,1,https://www.imdb.com/title/tt1375666,en,"Cobb, a skilled thief who commits corporate espionage by infiltrating the subconscious of his targets is offered a chance to regain his old life as payment for a task considered to be impossible: ""inception"", the implantation of another person's idea into a target's subconscious.",https://image.tmdb.org/t/p/w342//9gk7adHYeDvHkCSEqAvQNLV5Uge.jpg,9.9,2010-07-15T00:00:00,825532764.0,148,Your mind is the scene of the crime.,Inception,https://www.themoviedb.org/movie/27205,,,"List(List(1, Adventure), List(6, Action), List(13, Science Fiction))","Adventure, Action, Science Fiction"
https://image.tmdb.org/t/p/original//xJHokMbljvjADYdit5fK5VQsXEG.jpg,165000000.0,,2021-04-03T16:51:30.1633333,2,https://www.imdb.com/title/tt0816692,en,The adventures of a group of explorers who make use of a newly discovered wormhole to surpass the limitations on human space travel and conquer the vast distances involved in an interstellar voyage.,https://image.tmdb.org/t/p/w342//gEU2QniE6E77NI6lCU6MxlNBvIx.jpg,9.9,2014-11-05T00:00:00,675120017.0,169,Mankind was born on Earth. It was never meant to die here.,Interstellar,https://www.themoviedb.org/movie/157336,,,"List(List(1, Adventure), List(4, Drama), List(13, Science Fiction))","Adventure, Drama, Science Fiction"
https://image.tmdb.org/t/p/original//hkBaDkMWbLaf8B1lsWsKX7Ew3Xq.jpg,185000000.0,,2021-04-03T16:51:30.1633333,3,https://www.imdb.com/title/tt0468569,en,"Batman raises the stakes in his war on crime. With the help of Lt. Jim Gordon and District Attorney Harvey Dent, Batman sets out to dismantle the remaining criminal organizations that plague the streets. The partnership proves to be effective, but they soon find themselves prey to a reign of chaos unleashed by a rising criminal mastermind known to the terrified citizens of Gotham as the Joker.",https://image.tmdb.org/t/p/w342//qJ2tW6WMUDux911r6m7haRef0WH.jpg,9.9,2008-07-16T00:00:00,1004558444.0,152,Why So Serious?,The Dark Knight,https://www.themoviedb.org/movie/155,,,"List(List(4, Drama), List(6, Action), List(10, Thriller), List(11, Crime))","Drama, Action, Thriller, Crime"
https://image.tmdb.org/t/p/original//en971MEXui9diirXlogOrPKmsEn.jpg,58000000.0,,2021-04-03T16:51:30.1633333,4,https://www.imdb.com/title/tt1431045,en,"Deadpool tells the origin story of former Special Forces operative turned mercenary Wade Wilson, who after being subjected to a rogue experiment that leaves him with accelerated healing powers, adopts the alter ego Deadpool. Armed with his new abilities and a dark, twisted sense of humor, Deadpool hunts down the man who nearly destroyed his life.",https://image.tmdb.org/t/p/w342//yGSxMiF0cYuAiyuve5DA6bnWEOI.jpg,9.9,2016-02-09T00:00:00,783100000.0,108,Witness the beginning of a happy ending,Deadpool,https://www.themoviedb.org/movie/293660,,,"List(List(1, Adventure), List(6, Action), List(7, Comedy))","Adventure, Action, Comedy"
https://image.tmdb.org/t/p/original//kwUQFeFXOOpgloMgZaadhzkbTI4.jpg,220000000.0,,2021-04-03T16:51:30.1666667,5,https://www.imdb.com/title/tt0848228,en,"When an unexpected enemy emerges and threatens global safety and security, Nick Fury, director of the international peacekeeping agency known as S.H.I.E.L.D., finds himself in need of a team to pull the world back from the brink of disaster. Spanning the globe, a daring recruitment effort begins!",https://image.tmdb.org/t/p/w342//RYMX2wcKCBAr24UyPD7xwmjaTn.jpg,9.9,2012-04-25T00:00:00,1519557910.0,143,Some assembly required.,The Avengers,https://www.themoviedb.org/movie/24428,,,"List(List(1, Adventure), List(6, Action), List(13, Science Fiction))","Adventure, Action, Science Fiction"
https://image.tmdb.org/t/p/original//AmHOQ7rpHwiaUMRjKXztnauSJb7.jpg,237000000.0,,2021-04-03T16:51:30.1666667,6,https://www.imdb.com/title/tt0499549,en,"In the 22nd century, a paraplegic Marine is dispatched to the moon Pandora on a unique mission, but becomes torn between following orders and protecting an alien civilization.",https://image.tmdb.org/t/p/w342//6EiRUJpuoeQPghrs3YNktfnqOVh.jpg,9.9,2009-12-10T00:00:00,2787965087.0,162,Enter the World of Pandora.,Avatar,https://www.themoviedb.org/movie/19995,,,"List(List(1, Adventure), List(2, Fantasy), List(6, Action), List(13, Science Fiction))","Adventure, Fantasy, Action, Science Fiction"
https://image.tmdb.org/t/p/original//mZSAu5acXueGC4Z3S5iLSWx8AEp.jpg,170000000.0,,2021-04-03T16:51:30.1666667,7,https://www.imdb.com/title/tt2015381,en,"Light years from Earth, 26 years after being abducted, Peter Quill finds himself the prime target of a manhunt after discovering an orb wanted by Ronan the Accuser.",https://image.tmdb.org/t/p/w342//r7vmZjiyZw9rpJMQJdXpjgiCOk9.jpg,9.9,2014-07-30T00:00:00,772776600.0,121,All heroes start somewhere.,Guardians of the Galaxy,https://www.themoviedb.org/movie/118340,,,"List(List(1, Adventure), List(6, Action), List(13, Science Fiction))","Adventure, Action, Science Fiction"
https://image.tmdb.org/t/p/original//52AfXWuXCHn3UjD17rBruA9f5qb.jpg,63000000.0,,2021-04-03T16:51:30.1666667,8,https://www.imdb.com/title/tt0137523,en,"A ticking-time-bomb insomniac and a slippery soap salesman channel primal male aggression into a shocking new form of therapy. Their concept catches on, with underground ""fight clubs"" forming in every town, until an eccentric gets in the way and ignites an out-of-control spiral toward oblivion.",https://image.tmdb.org/t/p/w342//8kNruSfhk5IoE4eZOc4UpvDn6tq.jpg,9.9,1999-10-15T00:00:00,100853753.0,139,Mischief. Mayhem. Soap.,Fight Club,https://www.themoviedb.org/movie/550,,,"List(List(4, Drama))",Drama
https://image.tmdb.org/t/p/original//lmZFxXgJE3vgrciwuDib0N8CfQo.jpg,300000000.0,,2021-04-03T16:51:30.1666667,9,https://www.imdb.com/title/tt4154756,en,"As the Avengers and their allies have continued to protect the world from threats too large for any one hero to handle, a new danger has emerged from the cosmic shadows: Thanos. A despot of intergalactic infamy, his goal is to collect all six Infinity Stones, artifacts of unimaginable power, and use them to inflict his twisted will on all of reality. Everything the Avengers have fought for has led up to this moment - the fate of Earth and existence itself has never been more uncertain.",https://image.tmdb.org/t/p/w342//7WsyChQLEftFiDOVTGkv3hFpyyt.jpg,9.9,2018-04-25T00:00:00,2046239637.0,149,An entire universe. Once and for all.,Avengers: Infinity War,https://www.themoviedb.org/movie/299536,,,"List(List(1, Adventure), List(6, Action), List(13, Science Fiction))","Adventure, Action, Science Fiction"
https://image.tmdb.org/t/p/original//w7RDIgQM6bLT7JXtH4iUQd3Iwxm.jpg,8000000.0,,2021-04-03T16:51:30.1666667,10,https://www.imdb.com/title/tt0110912,en,"A burger-loving hit man, his philosophical partner, a drug-addled gangster's moll and a washed-up boxer converge in this sprawling, comedic crime caper. Their adventures unfurl in three stories that ingeniously trip back and forth in time.",https://image.tmdb.org/t/p/w342//plnlrtBUULT0rh3Xsjmpubiso3L.jpg,9.9,1994-09-10T00:00:00,214179088.0,154,Just because you are a character doesn't mean you have character.,Pulp Fiction,https://www.themoviedb.org/movie/680,,,"List(List(10, Thriller), List(11, Crime))","Thriller, Crime"


* Data Quality Checks 

In [0]:
# Count the number of null values in each column
null_counts = [combined_df.where(col(c).isNull()).count() for c in combined_df.columns]

# Create a dictionary to store column names and corresponding null counts
null_counts_dict = dict(zip(combined_df.columns, null_counts))

# Display the null counts for each column
for column, null_count in null_counts_dict.items():
    print(f"Column '{column}': {null_count} null values")

Column 'BackdropUrl': 0 null values
Column 'Budget': 0 null values
Column 'CreatedBy': 9995 null values
Column 'CreatedDate': 0 null values
Column 'Id': 0 null values
Column 'ImdbUrl': 0 null values
Column 'OriginalLanguage': 0 null values
Column 'Overview': 0 null values
Column 'PosterUrl': 0 null values
Column 'Price': 0 null values
Column 'ReleaseDate': 0 null values
Column 'Revenue': 0 null values
Column 'RunTime': 0 null values
Column 'Tagline': 0 null values
Column 'Title': 0 null values
Column 'TmdbUrl': 0 null values
Column 'UpdatedBy': 9995 null values
Column 'UpdatedDate': 9995 null values
Column 'genres': 0 null values
Column 'GenresList': 0 null values


Drop columns with null value 

In [0]:
combined_df = combined_df.drop("CreatedBy", "UpdatedBy", "UpdatedDate", "Price", "OriginalLanguage")
# Show unique values in the "price" and "OriginalLanguage" columns using distinct
#unique_prices = combined_df.select("price").distinct() 
#unique_OriginalLanguage = combined_df.select("OriginalLanguage").distinct()


Drop columns with one distinct value

In [0]:
combined_df = combined_df.drop("Price", "OriginalLanguage")

#unique_prices = combined_df.select("price").distinct() 
#unique_OriginalLanguage = combined_df.select("OriginalLanguage").distinct()

* Data Cleansing and Transformation

Correct type

In [0]:

combined_df = combined_df.withColumnRenamed("Revenue", "BoxOffice") \
                    .withColumn("ReleaseDate", to_date(col("ReleaseDate"))) \
                    .withColumn("ReleaseYear", year(col("ReleaseDate"))) \
                    .withColumn("ImdbId", regexp_extract(col("ImdbUrl"), r'(\d{7})$', 1)) \
                    .withColumn("TmdbId", expr("substring_index(TmdbUrl, '/', -1)")) \
                    .withColumn("ReleaseYear", year(col("ReleaseDate"))) \
                    .withColumn("CreatedDate", to_date(col("CreatedDate")))
    

Remove duplicates

In [0]:
#id
duplicate_ids = combined_df.groupBy("id").count().filter(col("count") > 1).select("id")
duplicate_ids.show()

+----+
|  id|
+----+
|5014|
|8747|
|7520|
+----+



In [0]:
# Check if the "title" column has duplicates
duplicate_rows = combined_df.groupBy("title").count().filter(col("count") > 1)

if duplicate_rows.count() > 0:
    print("The 'title' column has duplicates.")
    duplicate_rows.show()
else:
    print("The 'title' column does not have duplicates.")

The 'title' column has duplicates.
+--------------------+-----+
|               title|count|
+--------------------+-----+
|        Total Recall|    2|
|     The Other Woman|    2|
|           Neighbors|    2|
|       Truth or Dare|    3|
|    Dawn of the Dead|    2|
|           Gladiator|    2|
|              Psycho|    2|
|Murder on the Ori...|    2|
|              Batman|    2|
| Clash of the Titans|    2|
|    Charlie's Angels|    2|
|        The Revenant|    2|
|                Fury|    2|
|   A Christmas Carol|    3|
|             Inferno|    2|
|      Les Misérables|    2|
|    The Great Gatsby|    2|
|                Life|    3|
|            The Mule|    2|
|Journey to the Ce...|    2|
+--------------------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col, lit, row_number
from pyspark.sql.window import Window

# Create a window partitioned by the title column and ordered by any column (e.g., id)
window_spec = Window.partitionBy("title").orderBy("id")

# Add a row number column within each partition
combined_df_with_row_number = combined_df.withColumn("row_number", row_number().over(window_spec))

# Define status column based on row number
combined_df_with_status = combined_df_with_row_number.withColumn(
    "status",
    when(col("row_number") == 1, lit("new")).otherwise(lit("update"))
)

# Drop the row_number column as it's no longer needed
combined_df = combined_df_with_status.drop("row_number")

In [0]:
combined_df.printSchema()

root
 |-- BackdropUrl: string (nullable = true)
 |-- Budget: double (nullable = true)
 |-- CreatedDate: date (nullable = true)
 |-- Id: long (nullable = true)
 |-- ImdbUrl: string (nullable = true)
 |-- Overview: string (nullable = true)
 |-- PosterUrl: string (nullable = true)
 |-- ReleaseDate: date (nullable = true)
 |-- BoxOffice: double (nullable = true)
 |-- RunTime: long (nullable = true)
 |-- Tagline: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- TmdbUrl: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- GenresList: string (nullable = false)
 |-- ReleaseYear: integer (nullable = true)
 |-- ImdbId: string (nullable = true)
 |-- TmdbId: string (nullable = true)
 |-- status: string (nullable = false)



In [0]:
exploded_df = combined_df.select(explode("genres").alias("genre"))

# Extract id and name from the struct
genre_lookup_table = exploded_df.select(
    col("genre.id").alias("genre_id"),
    col("genre.name").alias("genre_name")
).distinct()
from pyspark.sql.functions import explode, col, when, length

genre_lookup_table = genre_lookup_table.filter(
    (length("genre_id") > 0) & (length("genre_name") > 0)
).distinct().orderBy("genre_id")

In [0]:
display(genre_lookup_table)

genre_id,genre_name
1,Adventure
2,Fantasy
3,Animation
4,Drama
5,Horror
6,Action
7,Comedy
8,History
9,Western
10,Thriller


In [0]:
imdb_df = (combined_df
           .select("ImdbId", "ImdbUrl", "Id")
           .filter(combined_df["ImdbId"] != "")
           .dropDuplicates(["ImdbId"])
           .orderBy("ImdbId"))

In [0]:
display(imdb_df)

ImdbId,ImdbUrl,Id
439,https://www.imdb.com/title/tt0000439,4841
1223,https://www.imdb.com/title/tt0001223,8756
3008,https://www.imdb.com/title/tt10003008,5567
4008,https://www.imdb.com/title/tt0004008,9630
4972,https://www.imdb.com/title/tt0004972,5158
6006,https://www.imdb.com/title/tt10006006,4842
6864,https://www.imdb.com/title/tt0006864,6737
7507,https://www.imdb.com/title/tt0007507,9992
7880,https://www.imdb.com/title/tt0007880,8852
8133,https://www.imdb.com/title/tt0008133,7116


# Creating table movie

In [0]:
combined_df.createOrReplaceTempView("Temp1")

In [0]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS TableMovie_quarantine
    USING delta
    AS
    SELECT 
        Id,
        Title,
        Tagline,
        GenresList, 
        Budget,
        BoxOffice,
        Overview,
        ReleaseDate,
        ReleaseYear, 
        RunTime,
        status
    FROM Temp1
    where RunTime<0
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
df_TableMovie_quarantine = spark.read.table("TableMovie_quarantine")
df_TableMovie_quarantine.write.format("delta").mode("overwrite").save("dbfs:/bronze/TableMovie_quarantine")

In [0]:
%sql
DROP TABLE IF EXISTS TableMovie;

In [0]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS TableMovie
    USING delta
    AS
    SELECT 
        Id,
        Title,
        Tagline,
        GenresList, 
        Budget,
        BoxOffice,
        Overview,
        ReleaseDate,
        ReleaseYear, 
        RunTime,
        status
    FROM Temp1
    where RunTime>0
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
df_TableMovie = spark.read.table("TableMovie")
df_TableMovie.write.format("delta").mode("overwrite").save("dbfs:/bronze/TableMovie")

# Creating genre table

In [0]:
genre_lookup_table.createOrReplaceTempView("Temp2")

In [0]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS TableGenre
    AS
    SELECT *
    FROM Temp2
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
df_TableGenre = spark.read.table("TableGenre")
df_TableGenre.write.format("delta").mode("overwrite").save("dbfs:/bronze/TableGenre")

#Creating Imdb lookup table

In [0]:
imdb_df.createOrReplaceTempView("Temp3")
spark.sql("""
    CREATE TABLE IF NOT EXISTS TableIMDB
    AS
    SELECT *
    FROM Temp3
""")
df_TableIMDB = spark.read.table("TableIMDB")
df_TableIMDB.write.format("delta").mode("overwrite").save("dbfs:/bronze/TableIMDB")


# to Bronze

In [0]:
def save_df_to_delta(df, table_name, save_path):
    """
    Saves a DataFrame as a Delta table with the specified table name and save path.
    """
    df.write.format("delta").mode("overwrite").save(save_path)

def upload_files(storage_account_name, storage_account_access_key, container_name):
    """
    Uploads Delta files from the dbfs:/bronze folder to the Azure Blob Storage bronze container.
    """
    import os
    from azure.storage.blob import BlobServiceClient

    # Create a BlobServiceClient object to connect to the storage account
    blob_service_client = BlobServiceClient(account_url=f"https://{storage_account_name}.blob.core.windows.net",credential=storage_account_access_key)

    # Get a reference to the bronze container
    container_client = blob_service_client.get_container_client(container_name)

    # List all Delta files in the dbfs:/bronze folder
    files = [file.path for file in dbutils.fs.ls(f"dbfs:/{container_name}")]

    # Upload each Delta file to the bronze container
    for file_path in files:
        # Extract the file name from the path
        file_name = file_path.split("/")[2]

        # Create a BlobClient for the Delta file in the bronze container
        blob_client = container_client.get_blob_client(file_name)

        # Upload the Delta file to the bronze container
        blob_client.upload_blob(f"dbfs:/{container_name}/{file_name}", overwrite=True)

        print(f"{file_name} has been successfully uploaded to the {container_name} container.")

    print(f"All Delta files have been uploaded to the {container_name} container.")

In [0]:
# Assuming df_TableMovie, df_TableGenre, df_TableIMDB are your DataFrames
# Save DataFrames to Delta format
save_df_to_delta(df_TableMovie, "TableMovie", "dbfs:/bronze/TableMovie")
save_df_to_delta(df_TableGenre, "TableGenre", "dbfs:/bronze/TableGenre")
save_df_to_delta(df_TableIMDB, "TableIMDB", "dbfs:/bronze/TableIMDB")

container_name = "bronze"  
upload_files("storage_account_name", "storage_account_access_key", container_name)




TableGenre
TableGenre has been successfully uploaded to the bronze container.
TableIMDB
TableIMDB has been successfully uploaded to the bronze container.
TableMovie
TableMovie has been successfully uploaded to the bronze container.
TableMovie_quarantine
TableMovie_quarantine has been successfully uploaded to the bronze container.
All Delta files have been uploaded to the bronze container.


Silver layer

In [0]:

def process_movie_quarantine_data(df): 
  df = df.withColumn("RunTime", expr("abs(cast(RunTime as int))"))
df_TableMovie_quarantine = process_movie_quarantine_data(df_TableMovie_quarantine)

In [0]:


def process_movie_data(df1, df2): 
  df1 = df1.union(df2)
  zero_budget_rows = df1.filter(df1["budget"] == 0)
  num_zero_budget_rows = zero_budget_rows.count()
  print("Number of rows with budget is 0:", num_zero_budget_rows)
  df1 = df1.withColumn("budget", when(df1["budget"] == 0, 1000000).otherwise(df1["budget"]))

df_TableMovie = process_movie_data(df_TableMovie, df_TableMovie_quarantine)

Number of rows with budget is 0: 4093


In [0]:

save_df_to_delta(df_TableMovie, "TableMovie", "dbfs:/silver/TableMovie")
save_df_to_delta(df_TableGenre, "TableGenre", "dbfs:/silver/TableGenre")
save_df_to_delta(df_TableIMDB, "TableIMDB", "dbfs:/silver/TableIMDB")

# to Silver

In [0]:
container_name = "silver"  
upload_files("torage_account_name", "storage_account_access_key", container_name)


In [0]:
#dbutils.fs.rm("dbfs:/silver", recurse=True)

True