#Install Packages

In [0]:
!pip install lxml beautifulsoup4 -q

#Initialize URLs

In [0]:
DATASET_URL = "https://datasets.imdbws.com/"
BOXOFFICE_URL = "https://www.boxofficemojo.com"
LANGUAGE_URL = "https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes"
IMDB_URL = "https://www.imdb.com/"
CHART_URL = f"{IMDB_URL}chart/"
INDIA_URL = f"{IMDB_URL}india/"
IMDB_SEARCH_URL = f"{IMDB_URL}search/title/"
TOP_1000_URL = f"{IMDB_SEARCH_URL}?groups=top_1000"
LANG_URL = f"{IMDB_SEARCH_URL}?primary_language"
BOXOFFICE_CHART = f"{BOXOFFICE_URL}/chart/ww_top_lifetime_gross/?area=XWW&offset="
BOXOFFICE_YEAR = f"{BOXOFFICE_URL}/year/world/"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0",
}

#Initialize variables

In [0]:
basePath = "/user/IMDB/"
rawPath = f"{basePath}raw/"
silverPath = f"{basePath}silver/"
goldPath = f"{basePath}gold/"
topTableName = "t_imdb_top"
boTableName = "t_bo"
goldTableName = "t_imdb"
goldTablePath = goldPath + goldTableName
file_list = [
    "name.basics.tsv.gz",
    "title.basics.tsv.gz",
    "title.crew.tsv.gz",
    "title.principals.tsv.gz",
    "title.ratings.tsv.gz",
]
RawFolderList = [file[:-7].replace("-co", "") for file in file_list]
SilverTableList = [
    "t_" + folderName.replace(".", "_").replace("-co", "")
    for folderName in RawFolderList
]
FullTableList = SilverTableList + [goldTableName] + [topTableName] + [boTableName]

# Import Required Functions

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
import pyspark.pandas as ps
import requests
from bs4 import BeautifulSoup
from datetime import date
from urllib.request import urlretrieve
import re
import concurrent.futures

#Drop all tables/folders if exists

In [0]:
dbutils.fs.rm(f"dbfs:{basePath}", recurse=True)
for tbl in FullTableList:
  spark.sql(f"DROP TABLE IF EXISTS {tbl}")

# Create Schema

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS silver location '{silverPath}'")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS gold location '{goldPath}'")
# for file_info in dbutils.fs.ls(silverPath) + dbutils.fs.ls(goldPath):
#   table_path = file_info.path
#   table_schema = table_path.split('/')[-3]
#   table_name = table_path.split('/')[-2]
#   spark.sql(f"CREATE TABLE IF NOT EXISTS {table_schema}.{table_name} location '{table_path}'")

# Extract Language Codes and names from Wiki

In [0]:
lang_df = ps.read_html(LANGUAGE_URL)[0][["ISO language names", "Set 1"]]
lang_df.columns = ["lang_name", "lang_code"]
lang_df = lang_df.to_spark()

# Function to Scrape Movie Data

In [0]:
def scrape_movie_data(url):
    """
    Scrapes movie data from the given URL and returns a movie_dict.

    Args:
        url (str): The URL to scrape.

    Returns:
        dict: A dictionary containing movie details.
    """

    # Send an HTTP request to the provided URL
    response = requests.get(url, headers=HEADERS)

    # Parse the HTML content using BeautifulSoup
    soup = BeautifulSoup(response.text, "html.parser")

    # Determine the type of URL
    is_box_office_url = BOXOFFICE_URL in url
    is_search_url = IMDB_SEARCH_URL in url
    is_india_url = INDIA_URL in url
    is_yearly_box_office = BOXOFFICE_YEAR in url

    # Define the class value based on the URL type
    class_value = 'ipc-metadata-list-item__icon-link' if is_india_url else 'ipc-title-link-wrapper'

    # Extract movie entries from the HTML content
    movie_entries = soup.find_all('table')[0].find_all("tr") if is_box_office_url else soup.find_all('a', class_=class_value, href=True)

    # Initialize an empty list to store movie data
    movie_data = []

    # Iterate through each movie entry
    for entry in movie_entries:
        # Initialize a dictionary to store movie details
        movie_dict = {
            "tconst": None,
            "rnk": None,
            "type": None,
            "movie_name": None,
            "movie_year": None,
            "box_office": -1
        }

        if is_box_office_url:
            # Extract relevant data from box office URL
            row_cols = entry.find_all('td')
            if row_cols:
                movie_dict['box_office'] = int(row_cols[2].text.replace("$", "").replace(",", ""))
                if is_yearly_box_office:
                    movie_dict['movie_name'] = row_cols[1].text
                    movie_dict['movie_year'] = int(url.split('/')[-1])
                else:
                    movie_dict['type'] = url.split('/')[-2]
                    movie_dict['rnk'] = int(row_cols[0].text.replace(",", ""))
                    movie_dict['tconst'] = re.search(r"(tt\d+)", row_cols[1].find("a")['href']).group(1)
        else:
            # Extract relevant data from other types of URLs
            link = entry.attrs.get('href')
            movie_name = entry.attrs.get('aria-label') if is_india_url else entry.find('h3', class_='ipc-title__text').text.strip()
            if link and link.startswith('/title/tt') and re.match(r'^\d+', movie_name):
                movie_dict['tconst'] = re.search(r"(tt\d+)", link).group(1)
                movie_dict['movie_name'] = movie_name
                movie_dict['rnk'] = int(re.search(r"^(\d+)\.", movie_name).group(1))
                movie_dict['type'] = url.split('/')[-1] if is_search_url else url.split('/')[-2]

        # Append the movie details to the list
        movie_data.append(movie_dict)

    # Return the list of movie data
    return movie_data


# Create Spark UDF for Web Scrapping

In [0]:
scrape_movie_data_udf = F.udf(scrape_movie_data, T.ArrayType(T.StructType([
    T.StructField("tconst", T.StringType()),
    T.StructField("rnk", T.IntegerType()),
    T.StructField("type", T.StringType()),
    T.StructField("movie_name", T.StringType()),
    T.StructField("movie_year", T.IntegerType()),
    T.StructField("box_office", T.IntegerType())
])))

#Extract all Top Rated Movies

In [0]:
# Define the list of URLs to scrape
url_list = [
    f"{CHART_URL}{suffix}" for suffix in ["toptv/", "top/"]
] + [
    f"{INDIA_URL}top-rated-{language}-movies/" for language in ["indian", "malayalam", "tamil", "telugu"]
]

#Extract Popular Language-wise Movies list

In [0]:
# Define the additional URLs to append
top_search_urls = [f"{LANG_URL}={language}" for language in ["ml", "ta", "hi", "te", "kn"]]
sort_syntax = ["moviemeter,asc","num_votes,desc"]

# Append the additional URLs to the url_list using list comprehension
url_list += [
    f'{top_url}&sort={sort}'
    for top_url in top_search_urls
    for sort in sort_syntax
]

#Extract Top 1000 Movies

In [0]:
# List of date ranges that return less than 50 titles each
date_ranges = [
    ("1900-01-01", "1940-12-31"),
    ("1941-01-01", "1950-12-31"),
    ("1951-01-01", "1958-12-31"),
    ("1959-01-01", "1963-12-31"),
    ("1964-01-01", "1970-12-31"),
    ("1971-01-01", "1975-12-31"),
    ("1976-01-01", "1982-12-31"),
    ("1983-01-01", "1987-12-31"),
    ("1988-01-01", "1991-12-31"),
    ("1992-01-01", "1994-12-31"),
    ("1995-01-01", "1997-12-31"),
    ("1998-01-01", "2000-12-31"),
    ("2001-01-01", "2002-12-31"),
    ("2003-01-01", "2004-09-30"),
    ("2004-10-01", "2006-12-31"),
    ("2007-01-01", "2008-12-31"),
    ("2009-01-01", "2010-12-31"),
    ("2011-01-01", "2012-12-31"),
    ("2013-01-01", "2014-06-30"),
    ("2014-07-01", "2015-12-31"),
    ("2016-01-01", "2017-12-31"),
    ("2018-01-01", "2019-12-31"),
    ("2020-01-01", "2022-12-31"),
    ("2023-01-01", "9999-12-31"),
]

url_list += [
    f"{IMDB_SEARCH_URL}?release_date={start_date},{end_date}&groups=top_1000"
    for start_date, end_date in date_ranges
]

# Extract Popular Movies and TV Shows

In [0]:
# Append the additional URLs to the url_list using list comprehension
url_list += [
    f"{IMDB_SEARCH_URL}?release_date=,9999-12-31&moviemeter={n},{n+49}"
    for n in range(1, 2000, 50)
]

# Extract All Time Boxoffice Details

In [0]:
rng = range(0, 1000, 200)
url_list += [BOXOFFICE_CHART + str(n) for n in rng]

# Create IMDB Top table

In [0]:
# Create a DataFrame with a single column "url"
imdb_df = spark.createDataFrame([(url,) for url in url_list], ["url"])

# Add a new column "movie_data" by applying the UDF
imdb_df = (
  imdb_df
  .withColumn("movie_data", F.explode(scrape_movie_data_udf("url")))
  .select("movie_data.*")
  )

# Store patterns to variables
primary_language_pattern = "%primary_language%,%"
popular_movie_pattern = "%release_date%moviemeter%"

# Filter and extract language code
imdb_df = (
    imdb_df
    .filter(F.col("tconst").isNotNull())
    .withColumn("lang_code", F.when(F.col("type").like(primary_language_pattern), F.regexp_extract(F.col("type"), r"primary_language=([a-z]+)", 1)))
)

# Join with language DataFrame
imdb_df = imdb_df.join(lang_df, "lang_code", 'leftouter')

# Select columns with conditions
select_exprs = [
    "tconst",
    "lang_name",
    (F.col("rnk") + F.when(F.col("type").like(popular_movie_pattern), F.regexp_extract("type", r"moviemeter=(\d+),", 1).cast("int") - 1).otherwise(0)).alias("rnk"),
    F.col("type").alias("url")
]

# Define a dictionary for conditional columns
conditional_columns = {
    "box_office": F.col("box_office") >= 0,
    "is_in_top_250": F.col("type").isin(['toptv','top','top-rated-indian-movies','top-rated-malayalam-movies','top-rated-tamil-movies','top-rated-telugu-movies']),
    "is_in_top_1000": F.col("type").like("%top_1000%"),
    "is_popular": F.col("type").like(popular_movie_pattern),
    "is_primary_lang": F.col("type").like(primary_language_pattern),
    "is_asc": F.col("type").like("%,asc%"),
    "is_desc": F.col("type").like( "%,desc%")
}

# Add conditional columns
for col_name, condition in conditional_columns.items():
  if col_name.startswith("is_"):
    select_exprs.append(F.when(condition, F.lit('Y')).otherwise(F.lit('N')).alias(col_name))
  else:
    select_exprs.append(F.when(condition, F.col(col_name)).alias(col_name))

# Select the final columns
imdb_df = imdb_df.select(*select_exprs)

# Write to table
(
  imdb_df
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema","true")
  .saveAsTable(f"silver.{topTableName}")
)

# Extract Yearly Box Office Details

In [0]:
current_year = date.today().year
rng = range(1977, current_year + 1, 1)
yearly_box_office_url_list = [BOXOFFICE_YEAR + str(n) for n in rng]

# Create Dataframe and build t_bo table

In [0]:
bo_df = spark.createDataFrame([(url,) for url in yearly_box_office_url_list], ["url"])

bo_df = (
  bo_df
  .withColumn("movie_data", F.explode(scrape_movie_data_udf("url")))
  .filter(F.col("movie_data.movie_name").isNotNull())
  .select("movie_data.movie_name",
          "movie_data.movie_year",
          "movie_data.box_office")
  )

(
  bo_df
 .write
 .format("delta")
 .mode("overwrite")
 .option("overwriteSchema","true")
 .saveAsTable(f"silver.{boTableName}")
)

#Download the datasets to driver and move to Raw storage folders

In [0]:
for file in file_list:
  tablename = file[:-7].replace("-co","")
  extn = file[-3:]
  BaseURL = DATASET_URL + file
  DriverPath = f"file:/databricks/driver/{file}"
  dbfsPath = f"dbfs:{rawPath}{tablename}/{file}"
  urlretrieve(BaseURL,file)
  dbutils.fs.mv(DriverPath, dbfsPath)

#Create Silver Delta tables

## Create function for reading from raw and writing to silver

In [0]:
def load_table(rawFolderName):
  rawFilePath = rawPath + rawFolderName
  silverTableName = "t_"+rawFolderName.replace(".","_")
  silverSavePath = silverPath + silverTableName
  delim = "\t"
  df = (
    spark
    .read
    .format("csv")
    .option("inferSchema", "false")
    .option("header","true")
    .option("delimiter",delim)
    .load(rawFilePath)
  )
  colToChange = {'averageRating':'decimal(3,1)', 'numVotes':'int', 'startYear':'int', 'runtimeMinutes':'int'}
  dfColToChange= {k:v for (k,v) in colToChange.items() if k in df.columns}
  for colName, dataType in dfColToChange.items():
    df= df.withColumn(colName, F.expr(f"try_cast({colName} as {dataType})"))
  (
    df
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema","true")
    .saveAsTable(f"silver.{silverTableName}")
  )

## Execute the load_table function for multiple tables in parallel

In [0]:
from threading import Thread
from queue import Queue

q = Queue()

worker_count = 5  # Number of tables which will be loaded in parallel


def run_tasks(function, q):
    while not q.empty():
        value = q.get()
        function(value)
        q.task_done()


for rawFolderName in RawFolderList:
    q.put(rawFolderName)

for i in range(worker_count):
    t = Thread(target=run_tasks, args=(load_table, q))
    t.daemon = True
    t.start()

q.join()

#Create Gold Table

In [0]:
# Declare Dataframes
tbs = spark.table('silver.t_title_basics').alias("tbs")
trt = spark.table('silver.t_title_ratings').alias("trt")
tnb = spark.table('silver.t_name_basics').alias("tnb")
imd = spark.table('silver.t_imdb_top').alias("imd")
tps = spark.table('silver.t_title_principals').alias("tps")
tcr = spark.table('silver.t_title_crew').alias("tcr")
tbo = spark.table('silver.t_bo').alias("tbo")

# Get Crew informations
tps = (
     tps
     .filter(F.col("category").isin(['actor', 'actress']))
     .select("tconst", 
             "category", 
             "nconst")
)
tcr = (
     tcr
     .filter(~F.col("directors").like("_N"))
     .select("tconst", 
             F.lit("director").alias("category"), 
             F.explode(F.split(F.col("directors"), ',')).alias("nconst"))
)
tps = tps.union(tcr)

# Get Box office informations
tbo = (
     tbo
     .join(tbs,(tbo.movie_name == tbs.primaryTitle) & (tbo.movie_year == tbs.startYear) & (tbs.titleType == F.lit("movie")), 'inner')
     .join(trt, trt.tconst == tbs.tconst, 'inner')
)
tbo = tbo.withColumn("rnk", F.row_number().over(Window
                                               .partitionBy("tbs.primaryTitle","tbs.startYear")
                                               .orderBy(F.col("trt.numvotes").desc(),
                                                        F.col("tbo.box_office").desc())))
tbo = (
     tbo
     .filter(F.col("rnk") == 1)
     .select(F.col("tbs.tconst").alias("tconst"), "tbo.box_office")
)

# Final dataframe which is one row per title
tbs = tbs.filter(F.col("tbs.titletype").isin(['movie','tvMiniSeries','short','tvSeries','tvShort','tvSpecial']))
tbs = (
     tbs
     .join(trt, trt.tconst == tbs.tconst, 'leftouter')
     .join(tps, tps.tconst == tbs.tconst, 'leftouter')
     .join(tnb, tnb.nconst == tps.nconst, 'leftouter')
     .join(imd, imd.tconst == tbs.tconst, 'leftouter')
     .join(tbo, tbo.tconst == tbs.tconst, 'leftouter')
)
tbs = (
     tbs
     .groupBy("tbs.tconst")
     .agg(F.max(F.regexp_replace(F.initcap("tbs.titletype"), 'Tv', 'TV ')).alias("title_type"),
          F.max("tbs.primarytitle").alias("primary_title"),
          F.max("tbs.originaltitle").alias("original_title"),
          F.max("tbs.startyear").alias("yr"),
          F.max(F.when(F.col("tbs.isadult") == 1,"Y").otherwise("N")).alias("is_adult"),
          F.max("tbs.runtimeminutes").alias("runtime_min"),
          F.max(F.when(~F.col("tbs.genres").like("_N"), F.col("genres"))).alias("genres"),
          F.max("trt.averagerating").alias("avg_rating"),
          F.max("trt.numvotes").alias("num_votes"),
          F.abs(F.coalesce(F.max("imd.box_office"), F.max("tbo.box_office"))).alias("box_office"),
          F.max(F.when(F.col("imd.is_in_top_250") == 'Y', F.col("imd.rnk"))).alias("top_250_rnk"),
          F.max(F.when(F.col("imd.is_in_top_1000") == 'Y', F.col("imd.rnk"))).alias("top_1000_rnk"),
          F.max(F.when(F.col("imd.is_popular") == 'Y', F.col("imd.rnk"))).alias("popularity_rnk"),
          F.max(F.when((F.col("imd.is_primary_lang") == 'Y') & (F.col("imd.is_asc") == 'Y'), F.col("rnk"))).alias("language_popularity_rnk"),
          F.max(F.when((F.col("imd.is_primary_lang") == 'Y') & (F.col("imd.is_desc") == 'Y'), F.col("rnk"))).alias("language_votes_rnk"),
          F.coalesce(F.max("imd.is_in_top_1000"), F.lit('N')).alias("is_top_1000_movies"),
          F.concat_ws('; ', F.collect_set("imd.lang_name")).alias("language_lst"),
          F.concat_ws('; ', F.collect_set(F.when(F.col("tps.category") == 'director', F.col("tnb.primaryname")))).alias("director_lst"),
          F.concat_ws('; ', F.collect_set(F.when(F.col("tps.category") == 'actor', F.col("tnb.primaryname")))).alias("actor_lst"),
          F.concat_ws('; ', F.collect_set(F.when(F.col("tps.category") == 'actress', F.col("tnb.primaryname")))).alias("actress_lst"),
          F.lit(F.current_date()).alias("last_refresh_date"))
)
tbs = (
     tbs
     .select([
              F.when(F.col(c) == "", None).otherwise(F.col(c)).alias(c)
              if c in ["language_lst", "director_lst", "actor_lst", "actress_lst"]
              else F.col(c)
              for c in tbs.columns
              ])
)
(
 tbs
 .write
 .format("delta")
 .mode("overwrite")
 .option("overwriteSchema","true")
 .saveAsTable(f"gold.{goldTableName}")
)

#Create view for Reporting

In [0]:
%sql
CREATE OR REPLACE VIEW v_imdb AS
select
  tconst as `IMDB ID`,
  title_type as `Title Type`,
  primary_title as `Primary Title`,
  original_title as `Original Title`,
  yr as `Release Year`,
  is_adult as `Is Adult`,
  runtime_min as `Runtime in Min`,
  genres as `Generes`,
  top_250_rnk as `Top 250 Rank`,
  row_number() over(
    order by
      popularity_rnk asc nulls last,
      language_popularity_rnk asc nulls last,
      top_250_rnk asc nulls last,
      top_1000_rnk asc nulls last,
      language_votes_rnk asc nulls last,
      num_votes desc nulls last
  ) as `Popularity Rank`,
  is_top_1000_movies as `Is in Top 1000 Movies`,
  language_lst as `Languages`,
  avg_rating,
  num_votes,
  box_office,
  director_lst as `Directors`,
  actor_lst as `Actors`,
  actress_lst as `Actresses`,
  last_refresh_date as `Last Refresh Date`
from
  gold.t_imdb