In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("LoadCSV").getOrCreate()

# Load the CSV file into a DataFrame
df = (
    spark.read.format("csv")  # Specify the format as CSV
    .option("header", True)  # Use the first row as column headers
    .option("inferSchema", True)  # Automatically infer data types
    .option("nullValue", "null")  # Handle null values in the dataset
    .load("dbfs:/FileStore/tmdb_API__movies_data_2000_2010.csv")  # Path to your CSV file
)

# Display schema to confirm correct loading
df.printSchema()

# Show a sample of the data to verify it loaded correctly
df.show(5, truncate=False)

# Count rows and columns for verification
row_count = df.count()
column_count = len(df.columns)
print(f"Total Rows: {row_count}, Total Columns: {column_count}")
print("All columns in the DataFrame:")
print(df.columns)


root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)

+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|id    |title                         |release_date|vote_average|popularity|vote_count|genre                   |original_language|
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|49948 |Fantasia 2000                 |1/1/2000    |6.966       |14.509    |1264.0    |Animation, Family, Music|en               |
|41245 |Peppermint Candy              |1/1/2000    |7.5         |11.201    |255.0     |Drama                   |ko               |
|46462 |N

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, DataType,StringType, FloatType, IntegerType, DoubleType, DateType, VarcharType
from pyspark.sql.functions import (
    col, when, sha2, regexp_replace, encode, lit, isnan,
    coalesce, create_map, concat, substring,current_timestamp, trim, lower, initcap,
)
from typing import Dict, List, Union
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import re

In [0]:

# Print the original schema
print("Original Schema:")
df.printSchema()

# Change the schema as specified
df = df.select(
    col("id").cast("integer").alias("id"),
    col("title").cast("string").alias("title"),
    col("release_date").cast("date").alias("release_date"),  # Change to date
    col("vote_average").cast("float").alias("vote_average"),      # Change to float
    col("popularity").cast("double").alias("popularity"),
    col("vote_count").cast("double").alias("vote_count"),
    col("genre").cast("string").alias("genre"),
    col("original_language").cast("string").alias("original_language")
)

# Print the new schema to confirm changes
print("\nUpdated Schema:")
df.printSchema()



# Count rows and columns for verification
row_count = df.count()
column_count = len(df.columns)
print(f"\nTotal Rows: {row_count}, Total Columns: {column_count}")



Original Schema:
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)


Updated Schema:
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)


Total Rows: 104905, Total Columns: 8


In [0]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when

# Initialize Spark session
spark = SparkSession.builder.appName("LoadCSV").getOrCreate()

# Set legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Load the CSV file into a DataFrame named df (assuming it's already loaded)
df = (
    spark.read.format("csv")  
    .option("header", True)  
    .option("inferSchema", True)  
    .option("nullValue", "null")  
    .load("dbfs:/FileStore/tmdb_API__movies_data_2000_2010.csv")  
)

# Display original schema
print("\nOriginal Schema:")
df.printSchema()

# Show a sample of data
print("\nSample Data:")
df.show(5, truncate=False)

# Convert 'release_date' column from string to date format
df = df.withColumn(
    "release_date",
    when(to_date(col("release_date"), "MM/dd/yyyy").isNotNull(), 
         to_date(col("release_date"), "MM/dd/yyyy")) \
    .when(to_date(col("release_date"), "yyyy-MM-dd").isNotNull(), 
          to_date(col("release_date"), "yyyy-MM-dd")) \
    .when(to_date(col("release_date"), "dd-MMM-yyyy").isNotNull(), 
          to_date(col("release_date"), "dd-MMM-yyyy")) \
    .otherwise(None)  
)

# Display updated schema
print("\nUpdated Schema:")
df.printSchema()

# Show updated sample data
print("\nUpdated Sample Data:")
df.show(5, truncate=False)

# Count rows and columns for verification
row_count = df.count()
column_count = len(df.columns)
print(f"\nTotal Rows: {row_count}, Total Columns: {column_count}")
print("All columns in the DataFrame:")
print(df.columns)


Original Schema:
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)


Sample Data:
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|id    |title                         |release_date|vote_average|popularity|vote_count|genre                   |original_language|
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|49948 |Fantasia 2000                 |1/1/2000    |6.966       |14.509    |1264.0    |Animation, Family, Music|en               |
|41245 |Peppermint Candy              |1/1/2000    |7.5         |11.201    |255.0     |Drama                

In [0]:

# Function to clean column names
def clean_column_names(df: DataFrame) -> DataFrame:
    cleaned_columns = []
    for col_name in df.columns:
        # Convert to lowercase, replace spaces with underscores, and remove special characters
        cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', col_name.lower().replace(' ', '_'))
        cleaned_columns.append(cleaned_name)
    
    # Rename columns in the DataFrame
    for old_name, new_name in zip(df.columns, cleaned_columns):
        df = df.withColumnRenamed(old_name, new_name)

    return df

# Assuming df is your already loaded DataFrame
df_cleaned = clean_column_names(df)

# Print the updated schema to confirm changes
print("\nUpdated Schema with Cleaned Column Names:")
df_cleaned.printSchema()

# Show a sample of the updated data
print("\nSample of updated data with Cleaned Column Names:")
df_cleaned.show(5, truncate=False)

# Count rows and columns for verification
row_count = df_cleaned.count()
column_count = len(df_cleaned.columns)
print(f"\nTotal Rows: {row_count}, Total Columns: {column_count}")




Updated Schema with Cleaned Column Names:
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)


Sample of updated data with Cleaned Column Names:
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|id    |title                         |release_date|vote_average|popularity|vote_count|genre                   |original_language|
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|49948 |Fantasia 2000                 |2000-01-01  |6.966       |14.509    |1264.0    |Animation, Family, Music|en               |
|41245 |Peppermint Candy              |2000-01-0

In [0]:
# Function to trim string data in all string columns
def trim_string_data(df: DataFrame) -> DataFrame:
    # Get the list of string columns
    string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, (StringType, VarcharType))]
    
    # Trim whitespace for each string column
    for column in string_columns:
        df = df.withColumn(column, trim(col(column)))
    
    return df

# Assuming df is your already loaded DataFrame
df_trimmed = trim_string_data(df)

# Print the updated schema to confirm changes
print("\nUpdated Schema after Trimming String Data:")
df_trimmed.printSchema()

# Show a sample of the updated data
print("\nSample of updated data after Trimming String Data:")
df_trimmed.show(5, truncate=False)

# Count rows and columns for verification
row_count = df_trimmed.count()
column_count = len(df_trimmed.columns)

print("\nAll trimmed columns in the DataFrame:",df_trimmed.columns)



Updated Schema after Trimming String Data:
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)


Sample of updated data after Trimming String Data:
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|id    |title                         |release_date|vote_average|popularity|vote_count|genre                   |original_language|
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|49948 |Fantasia 2000                 |2000-01-01  |6.966       |14.509    |1264.0    |Animation, Family, Music|en               |
|41245 |Peppermint Candy              |2000-01

In [0]:
# Initialize Spark session
spark = SparkSession.builder.appName("LoadCSV").getOrCreate()

# Load the CSV file into a DataFrame (replace this with your existing DataFrame)
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .option("nullValue", "null")
    .load("dbfs:/FileStore/tmdb_movie_data_2000_2010-1.csv")
)

# Function to convert column headers to title case
def convert_data_to_title_case(df):
    cleaned_columns = []
    for col_name in df.columns:
        # Convert to title case using initcap and replace underscores with spaces for proper capitalization
        cleaned_name = re.sub(r'[_]', ' ', col_name)  # Replace underscores with spaces
        cleaned_name = cleaned_name.title()  # Convert to title case
        cleaned_columns.append(cleaned_name)

    # Rename columns in the DataFrame
    for old_name, new_name in zip(df.columns, cleaned_columns):
        df = df.withColumnRenamed(old_name, new_name)

    return df

# Convert column headers to title case
df_with_title_case_headers_and_columns = convert_data_to_title_case(df)

# Print the updated schema to confirm changes
print("\nUpdated Schema after Converting Headers and Columns to Title Case:")
df_with_title_case_headers_and_columns.printSchema()

# Show a sample of the updated data
print("\nSample of updated data with Title Case Headers:")
df_with_title_case_headers_and_columns.show(5, truncate=False)

# Count rows and columns for verification
row_count = df_with_title_case_headers_and_columns.count()
column_count = len(df_with_title_case_headers_and_columns.columns)

print(df_with_title_case_headers_and_columns.columns)



Updated Schema after Converting Headers and Columns to Title Case:
root
 |-- Id: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Release Date: string (nullable = true)
 |-- Vote Average: string (nullable = true)
 |-- Popularity: double (nullable = true)
 |-- Vote Count: double (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Original Language: string (nullable = true)


Sample of updated data with Title Case Headers:
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|Id    |Title                         |Release Date|Vote Average|Popularity|Vote Count|Genre                   |Original Language|
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|49948 |Fantasia 2000                 |1/1/2000    |6.966       |14.509    |1264.0    |Animation, Family, Music|en               |
|41245 |Peppermint Cand

In [0]:

# Function to replace empty values in the genre column
def replace_empty_genre(df):
    # Specify the column to process
    column_name = "genre"
    default_value = "Unknown genre"
    
    # Print processing information
    print(f"Processing column: '{column_name}' | Default value: {default_value}")
    
    # Replace empty values in the 'genre' column with 'Unknown Genre'
    df = df.withColumn(
        column_name,
        when(col(column_name).isNull() | (col(column_name) == ""), lit(default_value)).otherwise(col(column_name))
    )
    
    print("=== Replacement of empty values completed successfully ===")
    return df

# Replace empty values in the genre column
df_with_replaced_genre = replace_empty_genre(df)

# Print the updated schema to confirm changes
print("\nUpdated DataFrame Schema:")
df_with_replaced_genre.printSchema()

# Show a sample of the updated data
print("\nUpdated DataFrame Content:")
df_with_replaced_genre.show(truncate=False)

# Count rows and columns for verification
row_count = df_with_replaced_genre.count()
column_count = len(df_with_replaced_genre.columns)

print(df_with_replaced_genre.columns)


Processing column: 'genre' | Default value: Unknown genre
=== Replacement of empty values completed successfully ===

Updated DataFrame Schema:
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)


Updated DataFrame Content:
+------+--------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|id    |title                           |release_date|vote_average|popularity|vote_count|genre                   |original_language|
+------+--------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|49948 |Fantasia 2000                   |1/1/2000    |6.966       |14.509    |1264.0    |Anim

In [0]:
# Function to standardize encoding for all string columns
def standardize_encoding(df):
    # Get the list of string columns
    string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
    
    total_encoded_rows = 0
    encoded_rows = []  # List to store rows that were encoded

    # Process each string column
    for column in string_columns:
        # Create a temporary column for comparison
        temp_column = f"{column}_temp"
        
        # Encode the column and remove non-printable characters
        df = df.withColumn(
            temp_column,
            regexp_replace(
                encode(when(col(column).isNotNull(), col(column)).otherwise(lit("")), "UTF-8").cast("string"),
                r'[\x00-\x1F\x7F-\xFF]', ''
            )
        )
        
        # Identify changed rows and collect them
        changed_rows_df = df.filter(col(column) != col(temp_column))
        total_encoded_rows += changed_rows_df.count()
        
        # Store the changed rows in a list for later display
        encoded_rows.append(changed_rows_df)

        # Replace original column with cleaned version
        df = df.drop(column).withColumnRenamed(temp_column, column)

    print(f"Total number of rows encoded: {total_encoded_rows}")
    
    # Combine all encoded rows into a single DataFrame if there are any changes
    if total_encoded_rows > 0:
        encoded_rows_df = encoded_rows[0]  # Start with the first DataFrame
        for additional_df in encoded_rows[1:]:
            encoded_rows_df = encoded_rows_df.union(additional_df)  # Union all changed DataFrames
        
        print("\nSample of Encoded Rows:")
        encoded_rows_df.show(truncate=False)

    return df

# Standardize encoding for the DataFrame
df_encoded = standardize_encoding(df)

# Print the updated schema to confirm changes
print("\nUpdated DataFrame Schema after Standardizing Encoding:")
df_encoded.printSchema()

# Show a sample of the updated data
print("\nSample of updated data after Standardizing Encoding:")
df_encoded.show(5, truncate=False)

# Count rows and columns for verification
row_count = df_encoded.count()
column_count = len(df_encoded.columns)

print(df_encoded.columns)


Total number of rows encoded: 6194

Sample of Encoded Rows:
+-------+------------------------------------------------------------------+------------+------------+----------+----------+--------------------------------+-----------------+-------------------------------------------------------------+
|id     |title                                                             |release_date|vote_average|popularity|vote_count|genre                           |original_language|title_temp                                                   |
+-------+------------------------------------------------------------------+------------+------------+----------+----------+--------------------------------+-----------------+-------------------------------------------------------------+
|1185068|Die Reichsautobahn - Strassen des Führers                         |1/1/2000    |0           |2.016     |0.0       |Documentary                     |de               |Die Reichsautobahn - Strassen des Fhrers           

In [0]:

# Find duplicate rows
duplicate_rows_df = df.groupBy(df.columns).count().filter(col("count") > 1)

# Count total number of duplicate rows
total_duplicate_rows = duplicate_rows_df.count()

# Print the total number of duplicate rows
print(f"Total number of duplicate rows: {total_duplicate_rows}")

# Optionally, show the duplicate rows and their counts
if total_duplicate_rows > 0:
    print("\nDuplicate Rows:")
    duplicate_rows_df.show(truncate=False)
    
    # Drop duplicate rows from the original DataFrame
    df = df.dropDuplicates()
    print("\nDuplicate rows have been dropped.")
else:
    print("\nNo duplicate rows found.")






Total number of duplicate rows: 0

No duplicate rows found.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when

# Initialize Spark session
spark = SparkSession.builder.appName("LoadCSV").getOrCreate()

# Set legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Load the CSV file into a DataFrame named df
df = (
    spark.read.format("csv")  
    .option("header", True)  
    .option("inferSchema", True)  
    .option("nullValue", "null")  
    .load("dbfs:/FileStore/tmdb_API__movies_data_2000_2010.csv")  
)

# Display original schema
print("\nOriginal Schema:")
df.printSchema()

# Show a sample of data
print("\nSample Data:")
df.show(5, truncate=False)

def standardize_dataframe(df):
    """
    Standardizes the DataFrame by cleaning column names, dropping duplicates,
    trimming string data, and converting specified date columns to date format.
    
    Args:
        df (DataFrame): The input DataFrame.

    Returns:
        DataFrame: The standardized DataFrame.
    """
    try:
        # Clean the column names (assuming clean_column_names is defined)
        df = clean_column_names(df)

        # Drop duplicates
        duplicate_rows_df = df.groupBy(df.columns).count().filter(col("count") > 1)
        
        total_duplicate_rows = duplicate_rows_df.count()
        print(f"Total number of duplicate rows: {total_duplicate_rows}")

        if total_duplicate_rows > 0:
            print("\nDuplicate Rows:")
            duplicate_rows_df.show(truncate=False)
            df = df.dropDuplicates()
            print("\nDuplicate rows have been dropped.")
        else:
            print("\nNo duplicate rows found.")

        
    

        # Trim string columns data (assuming trim_string_data is defined)
        df = trim_string_data(df)

        # Replace empty values in the genre column (assuming replace_empty_genre is defined)
        df = replace_empty_genre(df)

        # Standardize the encoding (assuming standardize_encoding is defined)
        df = standardize_encoding(df)

        # Convert 'release_date' column from string to date format in a single line
        df = df.withColumn(
            "release_date",
            when(to_date(col("release_date"), "MM/dd/yyyy").isNotNull(), 
                 to_date(col("release_date"), "MM/dd/yyyy"))
            .when(to_date(col("release_date"), "yyyy-MM-dd").isNotNull(), 
                  to_date(col("release_date"), "yyyy-MM-dd"))
            .when(to_date(col("release_date"), "dd-MMM-yyyy").isNotNull(), 
                  to_date(col("release_date"), "dd-MMM-yyyy"))
            .otherwise(None)  # Assign None for invalid formats
        )

        # Add Performance Analysis based on popularity and vote_count
        df = df.withColumn(
            "Performance_Analysis",
            when((col("popularity") >= 0) & (col("popularity") <= 0.1), "Disaster")
            .when((col("popularity") > 0.1) & (col("popularity") <= 1.0), "Flop")
            .when((col("popularity") > 1.0) & (col("popularity") <= 3.0), "Average")
            .when((col("popularity") > 3.0) & (col("popularity") <= 50), "Hit")
            .when((col("popularity") > 50) & (col("popularity") <= 100), "Superhit/Blockbuster")
            .when((col("popularity") > 100) & (col("popularity") <= 437.087), "Culthit")
            .otherwise("Unknown")
        )

        return df
    
    except Exception as e:
        print(f"An error occurred during standardization: {e}")
        return None  # Return None if an error occurs

# Standardize the DataFrame
df_standardized = standardize_dataframe(df)

# Check if `df_standardized` is None before proceeding
if df_standardized is not None:
    # Print the updated schema to confirm changes
    print("\nUpdated DataFrame Schema after Standardizing:")
    df_standardized.printSchema()

    # Show a sample of the updated data
    print("\nSample of Updated Data after Standardizing:")
    df_standardized.show(50, truncate=False)

    # Count rows and columns for verification
    row_count = df_standardized.count()
    column_count = len(df_standardized.columns)
    
    print(f"\nTotal Rows: {row_count}, Total Columns: {column_count}")
else:
    print("Data standardization failed; no valid DataFrame to display.")



Original Schema:
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- original_language: string (nullable = true)


Sample Data:
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|id    |title                         |release_date|vote_average|popularity|vote_count|genre                   |original_language|
+------+------------------------------+------------+------------+----------+----------+------------------------+-----------------+
|49948 |Fantasia 2000                 |1/1/2000    |6.966       |14.509    |1264.0    |Animation, Family, Music|en               |
|41245 |Peppermint Candy              |1/1/2000    |7.5         |11.201    |255.0     |Drama                

In [0]:
# Create or replace temporary view with standardized DataFrame
df_standardized.createOrReplaceTempView("silver_layer")
print("\nTemporary view 'silver_layer' created successfully.")



Temporary view 'silver_layer' created successfully.


In [0]:

# Clean and standardize the DataFrame
df_silver_layer = standardize_dataframe(df)

# Count the number of rows and columns
num_rows = df_silver_layer.count()
num_columns = len(df_silver_layer.columns)

print(f"Number of columns in the standardized DataFrame: {num_columns}")
df_silver = df_silver_layer

# Display the number of rows after dropping
final_row_count = df_silver.count()
print(f"Number of rows : {final_row_count}")

# Display the first 50 rows of the DataFrame
print("\nFirst 10 rows of df_silver:")
df_silver.show(n=50, truncate=False)
 # Display the schema of df_silver
print("\nSchema of df_silver:")
df_silver.printSchema()

# Save df_silver as a Parquet file in DBFS
df_silver.write.mode("overwrite").parquet("dbfs:/Users/simhad76@students.rowan.edu/silver_layer_data")

print("Data saved to DBFS successfully.")





Total number of duplicate rows: 0

No duplicate rows found.
Processing column: 'genre' | Default value: Unknown genre
=== Replacement of empty values completed successfully ===
Total number of rows encoded: 6194

Sample of Encoded Rows:
+-------+------------------------------------------------------------------+------------+------------+----------+----------+--------------------------------+-----------------+-------------------------------------------------------------+
|id     |title                                                             |release_date|vote_average|popularity|vote_count|genre                           |original_language|title_temp                                                   |
+-------+------------------------------------------------------------------+------------+------------+----------+----------+--------------------------------+-----------------+-------------------------------------------------------------+
|1185068|Die Reichsautobahn - Strassen des Führer