In [0]:

#imports
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql import functions as F
import re
from pyspark.sql.types import StringType


##   DATA LOADING


In [0]:
dbutils.fs.cp("/Volumes/workspace/bronze_schema/ext_data/messy_IMDB_dataset.csv", "/Workspace/Users/surendhar010104@gmail.com/pyspark")


True

In [0]:
df = spark.read.csv("/Volumes/workspace/bronze_schema/ext_data/messy_IMDB_dataset.csv", header=True, inferSchema=True, sep=";")

In [0]:
df.printSchema()
display(df)

root
 |-- IMBD title ID: string (nullable = true)
 |-- Original titl�: string (nullable = true)
 |-- Release year: string (nullable = true)
 |-- Genr�: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Content Rating: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- Income: string (nullable = true)
 |--  Votes : string (nullable = true)
 |-- Score: string (nullable = true)



IMBD title ID,Original titl�,Release year,Genr�,Duration,Country,Content Rating,Director,_c8,Income,Votes,Score
tt0111161,The Shawshank Redemption,1995-02-10,Drama,142,USA,R,Frank Darabont,,$ 28815245,2.278.845,9.3
tt0068646,The Godfather,09 21 1972,"Crime, Drama",175,USA,R,Francis Ford Coppola,,$ 246120974,1.572.674,9.2
tt0468569,The Dark Knight,23 -07-2008,"Action, Crime, Drama",152,US,PG-13,Christopher Nolan,,$ 1005455211,2.241.615,9.
tt0071562,The Godfather: Part II,1975-09-25,"Crime, Drama",220,USA,R,Francis Ford Coppola,,"$ 4o8,035,783",1.098.714,9.0
tt0110912,Pulp Fiction,1994-10-28,"Crime, Drama",,USA,R,Quentin Tarantino,,$ 222831817,1.780.147,"8,9f"
tt0167260,The Lord of the Rings: The Return of the King,22 Feb 04,"Action, Adventure, Drama",201,New Zealand,PG-13,Peter Jackson,,$ 1142271098,1.604.280,08.9
tt0108052,Schindler's List,1994-03-11,"Biography, Drama, History",Nan,USA,R,Steven Spielberg,,$ 322287794,1.183.248,8.9
tt0050083,12 Angry Men,1957-09-04,"Crime, Drama",96,USA,Not Rated,Sidney Lumet,,$ 576,668.473,8.9
tt1375666,Inception,2010-09-24,"Action, Adventure, Sci-Fi",148,USA,PG-13,Christopher Nolan,,$ 869784991,2.002.816,8..8
tt0137523,Fight Club,10-29-99,Drama,Inf,UK,R,David Fincher,,$ 101218804,1.807.440,8.8


## SCHEMA CORRECTION

In [0]:
#names cleaned
df = df.withColumnRenamed("IMBD title ID", "IMDB_ID")
df = df.withColumnRenamed("Original titl�", "Original_title")
df = df.withColumnRenamed("Release year", "Release_year")
df = df.withColumnRenamed("Content Rating", "Content_Rating")

In [0]:
#stripped and removed special characters

rename_map = {old: re.sub(r'[^A-Za-z0-9_]+', '', old.strip()) for old in df.columns}

for old, new in rename_map.items():
    df = df.withColumnRenamed(old, new)


In [0]:
#casted to correct datatypes
df = (df
    .withColumn("IMDB_ID", col("IMDB_ID").cast("string"))
    .withColumn("Release_year", col("Release_year").cast("string"))
    .withColumn("Duration", col("Duration").cast("string"))
    .withColumn("Income", col("Income").cast("string"))
    .withColumn("Votes", col("Votes").cast("string"))
    .withColumn("Score", col("Score").cast("string"))
)


## DATA CLEANING

In [0]:

#cleaning of Score column
def clean_numeric_value(value: str) -> str:
    """
    Cleans a single numeric-looking string (e.g. '9,.0', '++8.7', '8,9f', etc.)
    and returns a cleaned numeric string that can be safely cast to float.
    """
    if value is None:
        return None

    # Step 1: ensure string type and strip spaces
    value = str(value).strip()
    # Step 2: replace commas with dots
    value = value.replace(",", ".")
    # Step 3: remove all non-digit, non-dot, non-minus characters
    value = re.sub(r"[^0-9.\-]+", "", value)
    # Step 4: collapse multiple dots into one
    value = re.sub(r"\.{2,}", ".", value)
    # Step 5: remove leading '-' or '.' and trailing '.'
    value = re.sub(r"^-+|\.+$", "", value)
    value = re.sub(r"^\.+", "", value)
    # Step 6: if multiple dots remain, keep only the first valid numeric pattern
    value = re.sub(r"^([0-9]*\.[0-9]+)\..*$", r"\1", value)
    value = re.sub(r"(?<!^)-", "", value)
    # Step 7: handle empty or malformed values
    if value == "" or value == "." or value == "-":
        return None
    return value

@udf(returnType=StringType())
def clean_numeric_udf(value):
    return clean_numeric_value(value)

df = df.withColumn("Score", clean_numeric_udf(F.col("Score")))
df = df.withColumn("Score", col("Score").cast("float"))


In [0]:
#cleaning the votes column
def clean_numeric(value: str) -> str:
    if value is None:
        return None

    value = str(value).strip()
    if value == "":
        return None

    # Remove everything except digits
    value = re.sub(r"[^0-9]", "", value)

    # If empty after cleaning, return None
    if value == "":
        return None

    return value

@udf(returnType=StringType())
def clean_votes_udf(value):
    return clean_numeric(value)

df = df.withColumn("Votes", clean_votes_udf(F.col("Votes")))
df = df.withColumn("Votes", col("Votes").cast("int"))

@udf(returnType=StringType())
def clean_duration_udf(value):
    return clean_numeric(value)

df = df.withColumn("Duration", clean_duration_udf(F.col("Duration")))
df = df.withColumn("Duration", col("Duration").cast("int"))


In [0]:
#added a new column
df = df.withColumn("points", df["Votes"]*df["Score"])

In [0]:
#column rename
df = df.withColumnRenamed("Genr", "Genre")
df = df.withColumnRenamed("Release_year", "Release")

In [0]:
#df = df.withColumn("Release", F.col("Release").cast("string"))


In [0]:
#income column clean
def clean_numeric_income(value: str) -> str:
    if value is None:
        return None
    value = str(value).strip()
    if value == "":
        return None
    # Remove everything except digits
    value = re.sub(r"[^0-9.]", "", value)
    # If empty after cleaning, return None
    if value == "":
        return None
    return value

@udf(returnType=StringType())
def clean_income_udf(value):
    return clean_numeric(value)

df = df.withColumn("Income", clean_income_udf(F.col("Income")))
df = df.withColumn("Income", col("Income").cast("double"))

In [0]:
#dropping of a dummy column
df = df.drop("_c8")
#drop rows if the imdb id is null
df = df.dropna(subset=["IMDB_ID"])

In [0]:
df.printSchema()
display(df)


root
 |-- IMDB_ID: string (nullable = true)
 |-- Original_title: string (nullable = true)
 |-- Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Content_Rating: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Income: double (nullable = true)
 |-- Votes: integer (nullable = true)
 |-- Score: float (nullable = true)
 |-- points: double (nullable = true)



IMDB_ID,Original_title,Release,Genre,Duration,Country,Content_Rating,Director,Income,Votes,Score,points
tt0111161,The Shawshank Redemption,1995-02-10,Drama,142.0,USA,R,Frank Darabont,28815245.0,2278845,9.3,21193258.93465519
tt0068646,The Godfather,09 21 1972,"Crime, Drama",175.0,USA,R,Francis Ford Coppola,246120974.0,1572674,9.2,14468600.50003624
tt0468569,The Dark Knight,23 -07-2008,"Action, Crime, Drama",152.0,US,PG-13,Christopher Nolan,1005455211.0,2241615,9.0,20174535.0
tt0071562,The Godfather: Part II,1975-09-25,"Crime, Drama",220.0,USA,R,Francis Ford Coppola,48035783.0,1098714,9.0,9888426.0
tt0110912,Pulp Fiction,1994-10-28,"Crime, Drama",,USA,R,Quentin Tarantino,222831817.0,1780147,8.9,15843307.62092781
tt0167260,The Lord of the Rings: The Return of the King,22 Feb 04,"Action, Adventure, Drama",201.0,New Zealand,PG-13,Peter Jackson,1142271098.0,1604280,8.9,14278091.388015747
tt0108052,Schindler's List,1994-03-11,"Biography, Drama, History",,USA,R,Steven Spielberg,322287794.0,1183248,8.9,10530906.748626707
tt0050083,12 Angry Men,1957-09-04,"Crime, Drama",96.0,USA,Not Rated,Sidney Lumet,576.0,668473,8.9,5949409.444997788
tt1375666,Inception,2010-09-24,"Action, Adventure, Sci-Fi",148.0,USA,PG-13,Christopher Nolan,869784991.0,2002816,8.8,17624781.182006836
tt0137523,Fight Club,10-29-99,Drama,,UK,R,David Fincher,101218804.0,1807440,8.8,15905472.34474182


In [0]:
df.write.mode("overwrite").option("header", True).csv("/Volumes/workspace/bronze_schema/ext_data/cleaned_imdb_data.csv")

## ----------------------------------------------------------------------------

##  CREATION OF INSIGHT TABLES

In [0]:
df_UK =df.where(df["Country"]=="UK").select("IMDB_ID", "Original_title","Country")
display(df_UK)

IMDB_ID,Original_title,Country
tt0137523,Fight Club,UK
tt0482571,The Prestige,UK
tt0253474,The Pianist,UK
tt1345836,The Dark Knight Rises,UK
tt0081505,The Shining,UK
tt0078748,Alien,UK
tt0057012,Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb,UK
tt0208092,Snatch,UK
tt0066921,A Clockwork Orange,UK
tt0093058,Full Metal Jacket,UK


In [0]:
print(df_UK.columns)

['IMDB_ID', 'Original_title', 'Country']


In [0]:
df_UK.write.format("delta").mode("overwrite").saveAsTable("imdb_uk")

In [0]:
df_UK.write.format("json").mode("overwrite").save("/Volumes/workspace/bronze_schema/ext_data/imdb_uk")

## ---------------------------------------------------------

## QUERYING OF DATA USING PYSPARK

In [0]:
display(df.filter(df["Score"]>9))
display(df.where(df["Score"]>9))

IMDB_ID,Original_title,Release,Genre,Duration,Country,Content_Rating,Director,Income,Votes,Score,points
tt0111161,The Shawshank Redemption,1995-02-10,Drama,142,USA,R,Frank Darabont,28815245.0,2278845,9.3,21193258.93465519
tt0068646,The Godfather,09 21 1972,"Crime, Drama",175,USA,R,Francis Ford Coppola,246120974.0,1572674,9.2,14468600.50003624
tt0109830,Forrest Gump,1994-10-06,"Drama, Romance",142,USA,PG-13,Robert Zemeckis,678229452.0,1755490,88.0,154483120.0


IMDB_ID,Original_title,Release,Genre,Duration,Country,Content_Rating,Director,Income,Votes,Score,points
tt0111161,The Shawshank Redemption,1995-02-10,Drama,142,USA,R,Frank Darabont,28815245.0,2278845,9.3,21193258.93465519
tt0068646,The Godfather,09 21 1972,"Crime, Drama",175,USA,R,Francis Ford Coppola,246120974.0,1572674,9.2,14468600.50003624
tt0109830,Forrest Gump,1994-10-06,"Drama, Romance",142,USA,PG-13,Robert Zemeckis,678229452.0,1755490,88.0,154483120.0


In [0]:
from pyspark.sql.functions import desc
display(df.filter(df["Score"]>8.5).orderBy(desc("Duration")))

IMDB_ID,Original_title,Release,Genre,Duration,Country,Content_Rating,Director,Income,Votes,Score,points
tt0071562,The Godfather: Part II,1975-09-25,"Crime, Drama",220.0,USA,R,Francis Ford Coppola,48035783.0,1098714,9.0,9888426.0
tt0047478,Shichinin no samurai,1955-08-19,"Action, Adventure, Drama",207.0,Japan,Unrated,Akira Kurosawa,322773.0,307958,8.6,2648438.917476654
tt0167260,The Lord of the Rings: The Return of the King,22 Feb 04,"Action, Adventure, Drama",201.0,New Zealand,PG-13,Peter Jackson,1142271098.0,1604280,8.9,14278091.388015747
tt0120689,The Green Mile,2000-10-03,"Crime, Drama, Fantasy",189.0,US.,R,Frank Darabont,286801374.0,1112336,8.6,9566090.02432251
tt0167261,The Lord of the Rings: The Two Towers,01/16-03,"Action, Adventure, Drama",179.0,New Zeland,PG-13,Peter Jackson,951227416.0,1449778,8.7,12613068.323476791
tt0120737,The Lord of the Rings: The Fellowship of the Ring,2002-01-18,"Action, Adventure, Drama",178.0,New Zesland,PG-13,Peter Jackson,887934303.0,1619920,8.8,14255296.30897522
tt0068646,The Godfather,09 21 1972,"Crime, Drama",175.0,USA,R,Francis Ford Coppola,246120974.0,1572674,9.2,14468600.50003624
tt0816692,Interstellar,2014-11-06,"Adventure, Drama, Sci-Fi",169.0,USA,PG-13,Christopher Nolan,696742056.0,1449256,8.6,12463602.15284729
tt0120815,Saving Private Ryan,1998-10-30,"Drama, War",169.0,USA,R,Steven Spielberg,482349603.0,1203825,8.6,10352895.459222794
tt0060196,"Il buono, il brutto, il cattivo",23rd December of 1966,Western,161.0,Italy,Approved,Sergio Leone,25252481.0,672499,8.8,5917991.328269005


In [0]:
print(df.columns)

['IMDB_ID', 'Original_title', 'Release_year', 'Genr�', 'Duration', 'Country', 'Content_Rating', 'Director', '_c8', 'Income', ' Votes ', 'Score']


In [0]:
display(df.select("IMDB_ID", "Original_title"))

IMDB_ID,Original_title
tt0111161,The Shawshank Redemption
tt0068646,The Godfather
tt0468569,The Dark Knight
tt0071562,The Godfather: Part II
tt0110912,Pulp Fiction
tt0167260,The Lord of the Rings: The Return of the King
tt0108052,Schindler's List
tt0050083,12 Angry Men
tt1375666,Inception
tt0137523,Fight Club


In [0]:
display(df.select("IMDB_ID").count())

100

In [0]:
#dont run
def clean_numeric_column(df, col_name, target_type="float"):
    tmp = f"__{col_name}__clean"

    # 1. Cast to string first, to clean safely
    col_as_str = F.col(col_name).cast("string")

    # 2. Apply cleaning regex transformations
    cleaned = (
        F.regexp_replace(F.trim(col_as_str), ",", ".")                   # commas -> dots
    )
    cleaned = F.regexp_replace(cleaned, r"[^0-9.\-]+", "")      # keep only 0-9 . -
    cleaned = F.regexp_replace(cleaned, r"\.{2,}", ".")         # collapse .. -> .
    cleaned = F.regexp_replace(cleaned, r"^-+|\.+$", "")        # remove leading -, trailing .
    cleaned = F.regexp_replace(cleaned, r"^\.+", "")            # remove starting dots
    cleaned = F.regexp_replace(cleaned, r"^([0-9]*\.[0-9]+)\..*$", r"\1")  # trim after extra dots
    # 3. Write cleaned text into temp column
    df = df.withColumn(tmp, cleaned)

    # 4. Safely cast cleaned string back to numeric
    df = df.withColumn(col_name, F.expr(f"try_cast({tmp} as {target_type})"))

    # 5. Drop temp column
    df = df.drop(tmp)

    return df

clean_numeric_column(df, "Score")

DataFrame[IMDB_ID: string, Original_title: string, Release_year: int, Genr: string, Duration: int, Country: string, Content_Rating: string, Director: string, _c8: string, Income: double, Votes: int, Score: float]