# Function to validate the parquet files

In [28]:
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DateType, BinaryType, DecimalType, ShortType
from pyspark.sql.utils import AnalysisException


# Start Spark session
spark = SparkSession.builder.appName("TableValidation").getOrCreate()

StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 30, Finished, Available, Finished)

# Parameters

In [29]:
expected_schema = {
    "actor": {
        "columns": ["actor_id", "first_name", "last_name", "last_update"],
        "types": {"actor_id": IntegerType(), "first_name": StringType(), "last_name": StringType(), "last_update": DateType()},
        "nullable": {"actor_id": False, "first_name": False, "last_name": False, "last_update": False},
        "min_records": 1
    },
    "address": {
        "columns": ["address_id", "address", "address2", "district", "city_id", "postal_code", "phone", "last_update"],
        "types": {"address_id": IntegerType(), "address": StringType(), "address2": StringType(), "district": StringType(), "city_id": IntegerType(),
            "postal_code": StringType(), "phone": StringType(), "last_update": DateType()},
        "nullable": {"address_id": False, "address": False, "address2": True, "district": True, "city_id": False, "postal_code": True, 
            "phone": True, "last_update": False},
        "min_records": 1
    },
    "category": {
        "columns":["category_id", "name", "last_update"],
        "types":{"category_id": IntegerType(), "name": StringType(), "last_update": DateType()},
        "nullable":{"category_id": False, "name": False, "last_update": False},
        "min_records": 1
    },
    "city": {
        "columns":["city_id", "city", "country_id", "last_update"],
        "types":{"city_id": IntegerType(), "city": StringType(), "country_id": IntegerType(), "last_update": DateType()},
        "nullable":{"city_id": False, "city": False, "country_id": False, "last_update": False},
        "min_records": 1
    },
    "country": {
        "columns":["country_id", "country", "last_update"],
        "types":{"country_id": IntegerType(), "country": StringType(), "last_update": DateType()},
        "nullable":{"country_id": False, "country": False, "last_update": False},
        "min_records": 1
    },
    "customer": {
        "columns":["customer_id", "store_id", "first_name", "last_name", "email", "address_id", "active", "create_date", "last_update"],
        "types":{"customer_id": IntegerType(), "store_id": IntegerType(), "first_name": StringType(), "last_name": StringType(), 
            "email": StringType(), "address_id": IntegerType(), "active": StringType(), "create_date": DateType(), "last_update": DateType()},
        "nullable":{"customer_id": False, "store_id": False, "first_name": False, "last_name": False, 
            "email": False, "address_id": False, "active": False, "create_date": False, "last_update": False},
        "min_records": 1
    },
    "film": {
        "columns":["film_id", "title", "description", "release_year", "language_id", "original_language_id", "rental_duration", 
            "rental_rate","length", "replacement_cost", "rating", "special_features", "last_update"],
        "types":{"film_id": IntegerType(), "title": StringType(), "description": StringType(), "release_year": StringType(), 
            "language_id": IntegerType(), "original_language_id": IntegerType(), "rental_duration": ShortType(), 
            "rental_rate": DecimalType(4,2), "length": ShortType(), "replacement_cost": DecimalType(5,2), "rating": StringType(), 
            "special_features": StringType(), "last_update": DateType()},
        "nullable":{"film_id": False, "title": False, "description": False, "release_year": False, "language_id": False, 
            "original_language_id": True, "rental_duration": False, "rental_rate": False, "length": False, "replacement_cost": False, 
            "rating": False, "special_features": False, "last_update": False},
        "min_records": 1
    },
    "film_actor": {
        "columns":["actor_id", "film_id", "last_update"],
        "types":{"actor_id": IntegerType(), "film_id": IntegerType(), "last_update": DateType()},
        "nullable":{"actor_id": False, "film_id": False, "last_update": False},
        "min_records": 1
    },
    "film_category": {
        "columns":["film_id", "category_id", "last_update"],
        "types":{"film_id": IntegerType(), "category_id": IntegerType(), "last_update": DateType()},
        "nullable":{"film_id": False, "category_id": False, "last_update": False},
        "min_records": 1
    },
    # "film_text": {
    #     "columns":[],
    #     "types":{},
    #     "nullable":{},
    #     "min_records": 1
    # },
    "inventory": {
        "columns":["inventory_id", "film_id", "store_id", "last_update"],
        "types":{"inventory_id": IntegerType(), "film_id": IntegerType(), "store_id": IntegerType(), "last_update": DateType()},
        "nullable":{"inventory_id": False, "film_id": False, "store_id": False, "last_update": False},
        "min_records": 1
    },
    "language": {
        "columns":["language_id", "name", "last_update"],
        "types":{"language_id": IntegerType(), "name": StringType(), "last_update": DateType()},
        "nullable":{"language_id": False, "name": False, "last_update": False},
        "min_records": 1
    },
    "payment": {
        "columns":["payment_id", "customer_id", "staff_id", "rental_id", "amount", "payment_date", "last_update"],
        "types":{"payment_id": IntegerType(), "customer_id": IntegerType(), "staff_id": IntegerType(), "rental_id": IntegerType(), 
            "amount": DecimalType(5,2), "payment_date": DateType(), "last_update": DateType()},
        "nullable":{"payment_id": False, "customer_id": False, "staff_id": False, "rental_id": True, "amount": False, "payment_date": False, "last_update": False},
        "min_records": 1
    },
    "rental": {
        "columns":["rental_id", "rental_date", "inventory_id", "customer_id", "return_date", "staff_id", "last_update"],
        "types":{"rental_id": IntegerType(), "rental_date": DateType(), "inventory_id": IntegerType(), "customer_id": IntegerType(), 
            "return_date": DateType(), "staff_id": IntegerType(), "last_update": DateType()},
        "nullable":{"rental_id": False, "rental_date": False, "inventory_id": False, "customer_id": False, "return_date": True, "staff_id": False, "last_update": False},
        "min_records": 1
    },
    "staff": {
        "columns":["staff_id", "first_name", "last_name", "address_id", "picture", "email", "store_id", "active", "username", "password", "last_update"],
        "types":{"staff_id": IntegerType(), "first_name": StringType(), "last_name": StringType(), "address_id": IntegerType(), "picture": BinaryType(), 
            "email": StringType(), "store_id": IntegerType(), "active": BooleanType(), "username": StringType(), "password": StringType(), "last_update": DateType()},
        "nullable":{"staff_id": False, "first_name": False, "last_name": False, "address_id": False, "picture": True, "email": False, "store_id": False, "active": False, 
            "username": False, "password": False, "last_update": False},
        "min_records": 1
    },
    "store": {
        "columns":["store_id", "manager_staff_id", "address_id", "last_update"],
        "types":{"store_id": IntegerType(), "manager_staff_id": IntegerType(), "address_id": IntegerType(), "last_update": DateType()},
        "nullable":{"store_id": False, "manager_staff_id": False, "address_id": False, "last_update": False},
        "min_records": 1
    }
}

StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 31, Finished, Available, Finished)

In [30]:
# Define the directory path where the parquet files are stored
parquet_path = "Files/"

StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 32, Finished, Available, Finished)

# Functions

### Function to validate each one of the files and comparing them with the expected_schema parameter

In [31]:
def validate_table(table_name, df):
    schema_info = expected_schema[table_name]

    # 1. Validate columns
    actual_columns = set(df.columns)
    expected_columns = set(schema_info["columns"])
    if actual_columns != expected_columns:
        raise Exception(f"Table '{table_name}' has incorrect columns. Expected {expected_columns}, but got {actual_columns}")
    
    # 2. Validate data types
    # Map of PySpark types to string equivalents as seen in df.dtypes
    pyspark_type_to_str = {
        "IntegerType": "int",
        "StringType": "string",
        "DateType": "timestamp",
        "FloatType": "float",
        "BooleanType": "boolean",
        "BinaryType": "binary",
        "ShortType": "smallint"
    }

    for col, expected_type in schema_info["types"].items():
        actual_type = dict(df.dtypes)[col]
        
        # Simplified check for DecimalType (ignores precision and scale)
        if isinstance(expected_type, DecimalType):
            if not actual_type.startswith("decimal"):
                raise Exception(f"Column '{col}' in table '{table_name}' has incorrect type. Expected decimal, but got {actual_type}")
        else:
            expected_type_str = pyspark_type_to_str.get(expected_type.__class__.__name__, None)
            if expected_type_str is None:
                raise Exception(f"Unsupported type '{expected_type.__class__.__name__}' in schema for column '{col}' in table '{table_name}'")

            if actual_type != expected_type_str:
                raise Exception(f"Column '{col}' in table '{table_name}' has incorrect type. Expected {expected_type_str}, but got {actual_type}")
    
    # 3. Validate nullability
    for col, is_nullable in schema_info["nullable"].items():
        if not is_nullable:
            null_count = df.filter(df[col].isNull()).count()
            if null_count > 0:
                raise Exception(f"Column '{col}' in table '{table_name}' contains {null_count} null values, but it should not be nullable.")
    
    # 4. Validate record count
    record_count = df.count()
    if record_count < schema_info["min_records"]:
        raise Exception(f"Table '{table_name}' has only {record_count} records. Expected at least {schema_info['min_records']}.")

    print(f"Table '{table_name}' passed all validations.")


StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 33, Finished, Available, Finished)

### Function to create and load raw data to silver

In [32]:
def create_external_delta_table(df, table_name, mode="overwrite"):
    """
    Create an external Delta table in the Lakehouse using a DataFrame.
    
    Parameters:
    - df: The DataFrame that you want to save as a Delta table.
    - table_name: The name of the external table to be created.
    - mode: How to handle the case when the table already exists. 
            Options are:
            - "error" (default): Throw an error if the table exists.
            - "overwrite": Overwrite the existing table.
            - "append": Append new data to the existing table.
    """
    try:
        # Check if the table already exists
        spark.catalog.tableExists(table_name)
        
        # If table doesn't exist, create it
        if mode == "error":
            df.write.format("delta").saveAsTable(table_name)
            print(f"Successfully created Delta table '{table_name}'")

        elif mode == "overwrite":
            df.write.format("delta").mode("overwrite").saveAsTable(table_name)
            print(f"Successfully overwrote Delta table '{table_name}'")

        elif mode == "append":
            df.write.format("delta").mode("append").saveAsTable(table_name)
            print(f"Successfully appended data to Delta table '{table_name}'")
        
    except AnalysisException as e:
        # Handle the case where the table does not exist yet
        if "Table or view not found" in str(e):
            df.write.format("delta").saveAsTable(table_name)
            print(f"Delta table '{table_name}' didn't exist, created a new table")
        else:
            print(f"Error while handling external Delta table '{table_name}': {str(e)}")

    except Exception as e:
        # Handle any general exception
        print(f"Error while creating external Delta table '{table_name}': {str(e)}")


StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 34, Finished, Available, Finished)

### Function to extract the list of files to be validated

In [33]:
def list_parquet_files(parquet_path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    path = spark._jvm.org.apache.hadoop.fs.Path(parquet_path)
    files = fs.listStatus(path)

    # Extract parquet file names only
    parquet_files = [file.getPath().getName() for file in files if file.getPath().getName().endswith(".parquet")]
    
    parquet_files = [
        file.getPath().getName().replace("dbo.", "").replace(".parquet", "")
        for file in files if file.getPath().getName().endswith(".parquet")
        ]
        
    return parquet_files

StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 35, Finished, Available, Finished)

# Run Process

In [34]:
# Get parquet file names in the directory just in case
parquet_files = list_parquet_files(parquet_path)
print(parquet_files)

StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 36, Finished, Available, Finished)

['actor', 'address', 'category', 'city', 'country', 'customer', 'film', 'film_actor', 'film_category', 'film_text', 'inventory', 'language', 'payment', 'rental', 'staff', 'store']


In [35]:
table_names = ["actor", "address", "category", "city", "country", "customer", "film", "film_actor", "film_category", "inventory", 
    "language", "payment", "rental", "staff", "store"]  # Tables to be validated
for table_name in table_names:
    df = spark.read.format("parquet").load(f"Files/dbo.{table_name}.parquet")
    try:
        validate_table(table_name, df)
        create_external_delta_table(df, table_name)
    except Exception as e:
        print(f"Validation failed for table '{table_name}': {e}")

StatementMeta(, 2cf4fb99-f346-46a4-a30d-80e658c153eb, 37, Finished, Available, Finished)

Table 'actor' passed all validations.
Successfully overwrote Delta table 'actor'
Table 'address' passed all validations.
Successfully overwrote Delta table 'address'
Table 'category' passed all validations.
Successfully overwrote Delta table 'category'
Table 'city' passed all validations.
Successfully overwrote Delta table 'city'
Table 'country' passed all validations.
Successfully overwrote Delta table 'country'
Table 'customer' passed all validations.
Successfully overwrote Delta table 'customer'
Table 'film' passed all validations.
Successfully overwrote Delta table 'film'
Table 'film_actor' passed all validations.
Successfully overwrote Delta table 'film_actor'
Table 'film_category' passed all validations.
Successfully overwrote Delta table 'film_category'
Table 'inventory' passed all validations.
Successfully overwrote Delta table 'inventory'
Table 'language' passed all validations.
Successfully overwrote Delta table 'language'
Table 'payment' passed all validations.
Successfully 