In [19]:
import os
current_dir = os.getcwd()

# Define the Delta table path relative to the current directory
delta_table_path = os.path.join(current_dir, "Delta_Lake/")

movieLens_folder_path = 'MovieLens_32M'
delta_table_movieLens_path = delta_table_path+'MovieLens/'

imdb_folder_path = 'IMDB_datasets'
delta_table_imdb_path = f'{delta_table_path}IMDB/'

boxOffice_folder_path = 'BoxOffice'
delta_table_boxOffice_path = delta_table_path+'BoxOffice/'

In [2]:
import os

print(os.getenv('SPARK_HOME'))
print(os.getenv('HADOOP_HOME'))
print(os.getenv('JAVA_HOME'))


C:\Program Files\Spark
C:\Program Files\hadoop-3.0.0
C:\Program Files\Java\jdk-21


Each .parquet file represents a partition of data (in case of partitioned data) or a set of rows for unpartitioned data.

.CRC files are automatically generated by Delta Lake during file writes and are used for quick integrity checks during reads.

In [6]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("LocalDeltaTable") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# MovieLens

In [None]:
for filename in os.listdir(movieLens_folder_path):
    # Check if the file is a CSV
    if filename.endswith('.csv'):
        # Construct full file path
        file_path = os.path.join(movieLens_folder_path, filename)
        
        try:
            df = spark.read.format('csv').option('header', True).load(file_path)
            # df.show()

            df.write.format('delta').mode('overwrite').save(delta_table_movieLens_path+filename.removesuffix('csv'))
            
            
        except Exception as e:
            print(f"Error reading {filename}: {e}")

In [None]:
# spark.read.format('delta').load(delta_table_movielens_path+'links').show()

# IMDB

In [None]:
from files_imbd import imbd_ingestion

imbd_ingestion()

for filename in os.listdir(imdb_folder_path):
    # Check if the file is a CSV
    if filename.endswith('.tsv'):
        # Construct full file path
        file_path = os.path.join(imdb_folder_path, filename)
        print(file_path)
        try:
            df = spark.read.format('csv').option('header', True).option('delimiter', '\t').load(file_path)
            # # df.show()

            df.write.format('delta').mode('overwrite').save(delta_table_imdb_path+filename.removesuffix('tsv'))
            
            
        except Exception as e:
            print(f"Error reading {filename}: {e}")

IMDB_datasets\name.basics.tsv
IMDB_datasets\title.akas.tsv
IMDB_datasets\title.basics.tsv
IMDB_datasets\title.crew.tsv
IMDB_datasets\title.episode.tsv
IMDB_datasets\title.principals.tsv
IMDB_datasets\title.ratings.tsv


# Box Office

In [17]:
def clean_column_names(df):
    for col_name in df.columns:
        new_name = col_name.replace(" ", "_").replace("%", "percent").replace("±", "plus_minus")  # Modify as needed
        df = df.withColumnRenamed(col_name, new_name)
    return df

df_cleaned = clean_column_names(df)

In [None]:
from boxoffice import boxOffice_weekenly_ingestion

boxOffice_weekenly_ingestion()

for filename in os.listdir(boxOffice_folder_path):
    # Check if the file is a CSV
    if filename.endswith('.json'):
        # Construct full file path
        file_path = os.path.join(boxOffice_folder_path, filename)
        
        try:
            df = spark.read.format("json").option("multiline", "true").load(file_path)
            # df.show()
            df = clean_column_names(df)
            df.write.format('delta').mode('overwrite').save(delta_table_boxOffice_path+filename.removesuffix('json'))
            
            
        except Exception as e:
            print(f"Error reading {filename}: {e}")

In [22]:
def getSpecificDayTable(delta_path, date):
    history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`").toPandas()
    timestamp = history[history["timestamp"].dt.strftime("%Y-%m-%d") == date]
    # print(timestamp)
    return str(timestamp.iloc[0]['timestamp'])

In [None]:
from datetime import datetime
time = getSpecificDayTable(delta_table_imdb_path+'title.ratings', datetime.today().strftime('%Y-%m-%d'))
df = spark.read.format('delta').option('timestampAsOf', time).load(delta_table_imdb_path+'title.ratings')


In [24]:
from datetime import datetime
time = getSpecificDayTable(delta_table_boxOffice_path+'box_office_data', '2025-03-29')
df = spark.read.format('delta').option('timestampAsOf', time).load(delta_table_boxOffice_path+'box_office_data')