In [0]:
%pip install bs4  # Install BeautifulSoup library for web scraping

In [0]:
# Restart the Python environment to apply changes made by library installations or updates
dbutils.library.restartPython()

In [0]:
import json  # For handling JSON data
import os  # For interacting with the operating system
import requests  # For making HTTP requests
import urllib.request  # For opening and reading URLs
from bs4 import BeautifulSoup  # For parsing HTML and XML documents
from delta.tables import DeltaTable  # For working with Delta Lake tables
from pyspark.sql import Row  # For creating Spark DataFrame rows
from pyspark.sql.functions import col, regexp_replace, split,slice, size, when, concat_ws, array_intersect, array, lit, element_at # For DataFrame operations

In [0]:
# Retrieve the OMDB API key from Databricks secrets
omdbkey = dbutils.secrets.get(scope = "djsdbsecrets", key = "omdbapikey")

In [0]:
ext_df = spark.sql('DESCRIBE EXTERNAL LOCATION `externaloc`')  # Retrieve metadata for the external location
ext_loc = ext_df.select('url').collect()[0][0]  # Extract the URL of the external location from the DataFrame
catalog_name = "data_analysis"  # Catalog name for the database
schema_name = "imdb_data"  # Schema name for the database
# Create a catalog if it does not exist, specifying a managed location
spark.sql("CREATE CATALOG IF NOT EXISTS {0} MANAGED LOCATION '{1}'".format(catalog_name, ext_loc))

# Create a schema within the specified catalog if it does not exist
spark.sql("CREATE SCHEMA IF NOT EXISTS {0}.{1}".format(catalog_name, schema_name))


In [0]:
download_path = '/Volumes/generaldata/dataanalysis/upload/imdb/'  # Path to download IMDB data
url = 'https://datasets.imdbws.com/'  # URL for IMDB datasets

In [0]:
html_content = requests.get(url).text  # Fetch HTML content from the specified URL
soup = BeautifulSoup(html_content, 'html.parser')  # Parse the HTML content using BeautifulSoup
items_list = soup.find('ul')  # Locate the unordered list in the parsed HTML
for item in items_list.findAll('a'):  # Iterate through all anchor tags within the list
   file_name = item.getText()  # Extract the text (file name) from the anchor tag
   decompressed_file_name = file_name.replace('.gz', '')  # Remove the .gz extension for the decompressed file name
   file_path = item.get('href')  # Get the href attribute (URL) of the anchor tag
   dest_download_path = "/tmp/{}".format(file_name)  # Define the temporary download path for the file
   urllib.request.urlretrieve(file_path, dest_download_path)  # Download the file to the temporary path
   os.system('gzip -d {}'.format(dest_download_path))  # Decompress the downloaded .gz file
   os.system("cp /tmp/{0} {1}".format(decompressed_file_name, download_path))  # Copy the decompressed file to the final download path

In [0]:
for file in dbutils.fs.ls(download_path):
    table_name = file.name.replace('.tsv', '').replace('.', '_')  # Create table name from file name
    full_table_name = f"{catalog_name}.{schema_name}.{table_name}"  # Construct full table name with catalog and schema
    if spark._jsparkSession.catalog().tableExists(full_table_name):  # Check if the table already exists
        path = file.path  # Get the file path
        df = spark.read.option("delimiter", "\t").option("header", "true").csv(path)  # Read the TSV file into a DataFrame
        df = df.replace(r"\N", None)  # Replace '\N' with None in the DataFrame
        df.write\
          .mode("overwrite")\
          .option("overwriteSchema", "true")\
          .saveAsTable("{0}.{1}.{2}".format(catalog_name, schema_name, table_name))  # Save DataFrame as a table

In [0]:
imdb_list = spark.sql(
    """
        SELECT
        tconst
        FROM data_analysis.imdb_data.title_basics
        WHERE titleType = 'movie'  -- Filter for movie titles
        AND isAdult = '0'          -- Exclude adult titles
        AND startYear >= 2020  -- Limit to movies released between 2001 and 2024
        LIMIT 2000
    """
)

In [0]:
# List of movie genres to be used for filtering and analysis
genres = [
    'Action',
    'Adventure',
    'Comedy',
    'Drama',
    'Fantasy',
    'Horror',
    'Mystery',
    'Romance',
    'Science Fiction',
    'Thriller',
    'Western',
    'Animation',
    'Crime',
    'Documentary',
    'Family',
    'Musical',
    'War',
    'Historical',
    'Sports',
    'Biography'
]

In [0]:
json_data = []  # Initialize an empty list to store JSON responses from the API
for imdbid in imdb_list.select('tconst').rdd.flatMap(lambda x: x).collect():  # Iterate over each IMDb ID
    imdb_url = f"http://www.omdbapi.com/?i={imdbid}&apikey={omdbkey}"  # Construct the API URL for the IMDb ID
    response = requests.get(imdb_url)  # Send a GET request to the API
    json_data.append(response.json() if response.status_code == 200 else {})  # Append the JSON response to the list

In [0]:
table_name = "omdb_analysis"

json_p = spark.sparkContext.parallelize(json_data)
df = spark.read.json(json_p)

# Clean and convert BoxOffice to decimal
df = df.withColumn(
    "BoxOffice",
    regexp_replace(col("BoxOffice"), "[$,]", "").cast("decimal(10,2)")  
)
# Clean and convert Runtime to integer
df = df.withColumn(
    "Runtime",
    regexp_replace(col("Runtime"), "[,min]", "").cast("integer")  
)
 # Split Genre string into an array
df = df.withColumn("GenreArray", split(col("Genre"), ",")) 
# Find intersection of GenreArray and predefined genres
df = df.withColumn(
    "Matches",
    array_intersect(
        col("GenreArray"),
        array(*[lit(g) for g in genres])  
    )
)
# Find intersection of GenreArray and predefined genres
df = df.withColumn("Primary_Genre", element_at(col("Matches"), 1))  # Get the first matching genre
df = df.withColumn("Hybrid_Genre", concat_ws(",", col("Matches")))  # Create a comma-separated string of matched genres

cols_to_drop = ["Matches", "Genre", "GenreArray"]  # Specify columns to be dropped from the DataFrame

df = df.drop(*cols_to_drop)  # Drop unnecessary columns from the DataFrame
df = df.withColumn("imdbVotes", when(col("imdbVotes") == "N/A", 0).otherwise(col("imdbVotes")))  # Replace 'N/A' in imdbVotes with 0
df = df.withColumn("Metascore", when(col("Metascore") == "N/A", 0).otherwise(col("Metascore")))  # Replace 'N/A' in Metascore with 0
df = df.withColumn("imdbRating", when(col("imdbRating") == "N/A", 0.0).otherwise(col("imdbRating")))  # Replace 'N/A' in imdbRating with 0.0
df = df.fillna({'Runtime': 0, 'Primary_Genre': 'unknown', 'Hybrid_Genre': 'unknown'})  # Fill null values in specified columns with default values
display(df)


# df.write\
#   .mode("overwrite")\
#   .option("mergeSchema", "true") \
#   .saveAsTable("{0}.{1}.{2}".format(catalog_name, schema_name, table_name))  # Save DataFrame as a table in the specified database

In [0]:
display(spark.table("data_analysis.imdb_data.omdb_analysis"))

Databricks data profile. Run in Databricks to view.