# Libraries & variables

In [0]:
# Import
from datetime import datetime
from pyspark.sql.functions import explode, col, lit, split

# Variables
blob_account_name = "streamersdata"
blob_container_name = "staging"
blob_sas_token = "<blob_sas_token>"

In [0]:
# Define the JDBC URL and connection properties
jdbc_url = "jdbc:sqlserver://streamers-sqlserver.database.windows.net:1433;database=streamers-sqldb"
connection_properties = {
    "user": "<USERNAME>",
    "password": "<PASSWORD",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


In [0]:
# Check if the token is still valid
expiry_date_str = blob_sas_token.split("&se=")[1].split("&")[0]
expiry_date = datetime.strptime(expiry_date_str, "%Y-%m-%dT%H:%M:%SZ")
current_date = datetime.utcnow()

if current_date > expiry_date:
    raise Exception("The SAS token is not valid anymore.")
else:
    print(f"The SAS token is valid until {expiry_date_str}.")

The SAS token is valid until 2024-12-20T17:03:16Z.


# File mounting

In [0]:
# Mount point
mount_point = f"/mnt/{blob_container_name}"

# Unmount if already mounted
if mount_point in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
    dbutils.fs.unmount(mount_point)

# Mount the Blob Storage
dbutils.fs.mount(
    source=f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net",
    mount_point=mount_point,
    extra_configs={f"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net": blob_sas_token}
)

# List files in the container
display(dbutils.fs.ls(mount_point))

/mnt/staging has been unmounted.


path,name,size,modificationTime
dbfs:/mnt/staging/amazon_prime.csv,amazon_prime.csv,6868017,1734100958000
dbfs:/mnt/staging/apple_tv.csv,apple_tv.csv,1394527,1734100958000
dbfs:/mnt/staging/hbo_max.csv,hbo_max.csv,1113776,1734100958000
dbfs:/mnt/staging/hulu.csv,hulu.csv,691846,1734100958000
dbfs:/mnt/staging/netflix.csv,netflix.csv,5044133,1734100959000


# Dataframe creation

In [0]:
# CSV to DataFrame function
def load_csv_to_df(relative_path):
    csv_file_path = f"{mount_point}/{relative_path}"
    return spark.read.format("csv").option("header", "true").load(csv_file_path)

# Relative paths
amazon_relative_path = "amazon_prime.csv"
apple_relative_path = "apple_tv.csv"
hbo_relative_path = "hbo_max.csv"
hulu_relative_path = "hulu.csv"
netflix_relative_path = "netflix.csv"

# Load DataFrames
amazon_df = load_csv_to_df(amazon_relative_path)
apple_df = load_csv_to_df(apple_relative_path)
hbo_df = load_csv_to_df(hbo_relative_path)
hulu_df = load_csv_to_df(hulu_relative_path)
netflix_df = load_csv_to_df(netflix_relative_path)

# Dataframe cleaning & transformation
The dataframes need to be transformed for easy processing.
1. I remove all the records without data. If the columns title, imdbId, imdbAverageRating or imdbNumVotes contain Null the row is deleted.
2. The data in the dataframes are in string format. So I change them to integer or float if needed.
3. Null's in the releaseYear column are replaced with 9999
4. A new column is added with the platform name.
5. Last step is combining. Luckely the five different CSV files follow the same template. So combining them is not to difficult.

In [0]:
# Function to transform DataFrame
def transform_df(df, platform_name):
    df = df.withColumn("platform", lit(platform_name))
    df = df.filter(
        col("title").isNotNull() &
        col("imdbId").isNotNull() &
        col("imdbAverageRating").isNotNull() &
        col("imdbNumVotes").isNotNull()
    )
    df = df.withColumn("releaseYear", col("releaseYear").cast("integer"))
    df = df.withColumn("imdbNumVotes", col("imdbNumVotes").cast("integer"))
    df = df.withColumn("imdbAverageRating", col("imdbAverageRating").cast("float"))
    df = df.fillna({
        "type": "Unknown",
        "genres": "Unknown",
        "releaseYear": 9999,
        "availableCountries": "Unknown"
    })
    return df

# Apply the transformation function to each DataFrame
amazon_df_cleaned = transform_df(amazon_df, "Amazon Prime")
apple_df_cleaned = transform_df(apple_df, "Apple TV Plus")
hbo_df_cleaned = transform_df(hbo_df, "HBO Max")
hulu_df_cleaned = transform_df(hulu_df, "Hulu")
netflix_df_cleaned = transform_df(netflix_df, "Netflix")

# Combine the DataFrames
combined_df = amazon_df_cleaned.unionByName(apple_df_cleaned) \
                               .unionByName(hbo_df_cleaned) \
                               .unionByName(hulu_df_cleaned) \
                               .unionByName(netflix_df_cleaned)


### Specific DataFrames: Content, Genre, Country, Platform
Each specific dataframe contains the unique records from the combined dataframe.

In [0]:
# Function to create dataframes with unique records per type
def extract_unique_data(column_name, alias_name):
  # Explode into multiple rows
  df = combined_df.select(
      col("title"),
      explode(split(col(column_name), ",\\s*")).alias(alias_name)
  )

  # Extract unique records
  unique_df = df.select(alias_name).distinct()

  return unique_df

# Dataframes with unique records
unique_countries_df  = extract_unique_data("availableCountries", "CountryCode")
unique_genres_df = extract_unique_data("genres", "GenreName")
unique_platform_df = extract_unique_data("platform", "platformName")

# Creat content specific dataframe
content_df = combined_df.select("title", "type", "releaseYear", "imdbId", "imdbAverageRating", "imdbNumVotes").distinct()

# Loading dataframes to SQL database

To make sure only new data is loaded on to the server the exsisting data is first put in a dataframe and joined with the new dataframe. This is done for the content, genres and country dataframes.
Next tables based on a join are created: ContentGenres, ContentCountries and ContentPlatform.

In [0]:

# Function to write data to SQL database

def load_write_to_sql(df, table_name, column_name):
  current_df = spark.read.jdbc(
    url=jdbc_url,
    table=table_name,
    properties=connection_properties
  )
  
  new_records_df = df.join(
    current_df, on=column_name, how="left_anti"
  )

  new_records_df.write.jdbc(
    url=jdbc_url,
    table=table_name,
    mode="append",
    properties=connection_properties
  )

load_write_to_sql(content_df, "Content", "imdbID") # Write data to "Content" table
load_write_to_sql(unique_countries_df, "Countries", "CountryCode") # Write data to "Countries" table
load_write_to_sql(unique_genres_df, "Genres", "GenreName") # Write data to "Genrename" table
load_write_to_sql(unique_platform_df, "Platforms", "platformName") # Write data to "Platforms" table


In [0]:
# Function to combine data from multiple tables and write to SQL database

def combine_write_to_sql(column_name, alias_name, table_name, sql_query, combi_table_name):
  
  # Load exsisting table from server
  def load_current_df(table):
    current_df = spark.read.jdbc(
      url=jdbc_url,
      table=table,
      properties=connection_properties
    )
    return current_df
  
  existing_content_df = load_current_df("Content") 
  existing_table_df = load_current_df(table_name)

  # Explode records into multiple rows and select the required columns
  content_combi_df = combined_df.select(
    col("imdbId").alias("contentID"), 
    explode(split(col(column_name), ",")).alias(alias_name)
  ).distinct()

  # Create or replace temporary views
  content_combi_df.createOrReplaceTempView("temp_content_combi")
  existing_content_df.createOrReplaceTempView("Content")
  existing_table_df.createOrReplaceTempView(table_name)

  # Perform the join and select the required columns
  content_combi_result_df = spark.sql(sql_query)

  # Write the result DataFrame to the SQL Server table
  content_combi_result_df.write.jdbc(
      url=jdbc_url,
      table=combi_table_name,
      mode="overwrite",
      properties=connection_properties
  )

country_sql_query = """
SELECT c.contentID, co.countryID
FROM temp_content_combi cc
JOIN Content c ON c.imdbId = cc.contentID
JOIN Countries co ON co.countryCode = cc.countryCode
"""

genre_sql_query = """
SELECT c.contentID, g.genreID
FROM temp_content_combi cg
JOIN Content c ON c.imdbId = cg.contentID
JOIN Genres g ON g.genreName = cg.genreName
"""

platform_sql_query = """
SELECT c.contentID, p.platformID
FROM temp_content_combi cp
JOIN Content c ON c.imdbId = cp.contentID
JOIN Platforms p ON p.platformName = cp.platformName
"""

combine_write_to_sql("availableCountries", "countryCode", "Countries", country_sql_query, "ContentCountries")
combine_write_to_sql("genres", "genreName", "Genres", genre_sql_query, "ContentGenres")
combine_write_to_sql("platform", "platformName", "Platforms", platform_sql_query, "ContentPlatform")