In [83]:
from pyspark.sql.functions import when, col, regexp_replace, trim
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, explode_outer, split
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType,StructType
import json

import pyspark
from pyspark import SparkConf, SparkContext
from delta import *
from pyspark.sql.functions import col, when, md5, concat_ws, lit
from pyspark.sql.types import StringType, StructType, StructField, TimestampType
from pyspark.sql.functions import *
from datetime import datetime
 
builder = pyspark.sql.SparkSession.builder.appName("ReadDelta") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
 
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [84]:
spark.sql(f"DROP DATABASE IF EXISTS silver CASCADE")
spark.sql(f"CREATE DATABASE IF NOT EXISTS silver") 

DataFrame[]

In [85]:
%run "/home/jovyan/work/notebooks/2.populate_bronze.ipynb"

+----------+-------------+-----------+--------------------+--------+
|Table_Name|     Progress|     Status|          Start_time|End_time|
+----------+-------------+-----------+--------------------+--------+
|   EV_Data|Ingest_Bronze|In_Progress|2024-12-05 19:14:...|    null|
+----------+-------------+-----------+--------------------+--------+

+----------+----------+-----------+--------+
|Layer_Name|Table_Name|Schema_Flag|Md5_Hash|
+----------+----------+-----------+--------+
+----------+----------+-----------+--------+

+----------+----------+-----------+--------------------+
|Layer_Name|Table_Name|Schema_Flag|            Md5_Hash|
+----------+----------+-----------+--------------------+
|    Bronze|   EV_Data|          N|99b6b6813c00543c7...|
+----------+----------+-----------+--------------------+

check_and_store_md5_result-True
+------------------+--------------------+--------+----------+------------+----------+------------+----+----------+-----------+-----------+-----+-----------

DataFrame[sid: string, id: string, position: string, created_at: string, created_meta: string, updated_at: string, updated_meta: string, meta: string, VIN_1_10: string, County: string, City: string, State: string, Postal_Code: string, Model_Year: string, Make: string, Model: string, Electric_Vehicle_Type: string, Clean_Alternative_Fuel_Vehicle_CAFV_Eligibility: string, Electric_Range: string, Base_MSRP: string, Legislative_District: string, DOL_Vehicle_ID: string, Vehicle_Location: string, Electric_Utility: string, 2020_Census_Tract: string, Counties: string, Congressional_Districts: string, WAOFM_GIS_Legislative_District_Boundary: string]

+----------+-------------+-----------+--------------------+--------+
|Table_Name|     Progress|     Status|          Start_time|End_time|
+----------+-------------+-----------+--------------------+--------+
|   EV_Data|Ingest_Bronze|In_Progress|2024-12-05 19:14:...|    null|
+----------+-------------+-----------+--------------------+--------+

+----------+-------------+-----------+--------------------+--------+
|Table_Name|     Progress|     Status|          Start_time|End_time|
+----------+-------------+-----------+--------------------+--------+
|   EV_Data|Ingest_Bronze|In_Progress|2024-12-05 19:14:...|    null|
+----------+-------------+-----------+--------------------+--------+

+----------+-------------+---------+--------------------+--------------------+
|Table_Name|     Progress|   Status|          Start_time|            End_time|
+----------+-------------+---------+--------------------+--------------------+
|   EV_Data|Ingest_Bronze|Completed|2024-12-05 19:14:...|2024-12-05 19:

In [86]:
update_audit_table("EV_Data", progress="Ingest_Silver", status="In_Progress", start=True)

+----------+-------------+-----------+--------------------+--------+
|Table_Name|     Progress|     Status|          Start_time|End_time|
+----------+-------------+-----------+--------------------+--------+
|   EV_Data|Ingest_Silver|In_Progress|2024-12-05 19:14:...|    null|
+----------+-------------+-----------+--------------------+--------+



In [87]:
from pyspark.sql.functions import when, col, regexp_replace, trim

# Step 1: Define your database and table names
bronze_database_name = "bronze"
columns_metadata_table = "columns_metadata"
silver_database_name = "silver"
new_table_name = "ev_std"

# Step 2: Fetch the schema of your original table (columns metadata)
df = spark.sql(f"SELECT name, dataTypeName FROM {bronze_database_name}.{columns_metadata_table}")
df.limit(10).show()

# Step 3: Apply the mapping to convert your custom dataTypeName to actual SQL data types
df_with_sql_types = df.withColumn(
    "SQL_DataType",
    when(df.dataTypeName == "number", "DOUBLE")
    .when(df.dataTypeName == "text", "STRING")
    .when(df.dataTypeName == "point", "STRING")
    .when(df.dataTypeName == "meta_data", "STRING")
    .otherwise("UNKNOWN")
)

# Step 4: Correct only the 'name' column by replacing spaces and dashes with underscores using regexp_replace
df_with_sql_types = df_with_sql_types.withColumn(
    "name",
    regexp_replace(col("name"), "[^a-zA-Z0-9]+", "_")
)
df_with_sql_types = df_with_sql_types.withColumn(
    "name",
    trim(regexp_replace(col("name"), "^_+|_+$", ""))
)

# Step 5: Display the corrected schema with column names and data types
df_with_sql_types.limit(10).show()

# Step 6: Generate the CREATE TABLE statement based on the corrected schema
columns_with_types = [
    f"`{row['name']}` {row['SQL_DataType']}" for row in df_with_sql_types.collect()
]

create_table_statement = f"""
CREATE TABLE if not exists {silver_database_name}.{new_table_name} (
    {', '.join(columns_with_types)}
) USING DELTA 
"""

# Step 7: Print the CREATE TABLE statement for verification
print(create_table_statement)

# Execute the SQL statement to create the new table
spark.sql(create_table_statement)

# Confirm the table has been created successfully
print(f"Table {silver_database_name}.{new_table_name} has been created successfully.")


+------------+------------+
|        name|dataTypeName|
+------------+------------+
|         sid|   meta_data|
|          id|   meta_data|
|    position|   meta_data|
|  created_at|   meta_data|
|created_meta|   meta_data|
|  updated_at|   meta_data|
|updated_meta|   meta_data|
|        meta|   meta_data|
|  VIN (1-10)|        text|
|      County|        text|
+------------+------------+

+------------+------------+------------+
|        name|dataTypeName|SQL_DataType|
+------------+------------+------------+
|         sid|   meta_data|      STRING|
|          id|   meta_data|      STRING|
|    position|   meta_data|      STRING|
|  created_at|   meta_data|      STRING|
|created_meta|   meta_data|      STRING|
|  updated_at|   meta_data|      STRING|
|updated_meta|   meta_data|      STRING|
|        meta|   meta_data|      STRING|
|    VIN_1_10|        text|      STRING|
|      County|        text|      STRING|
+------------+------------+------------+


CREATE TABLE if not exists silv

In [88]:
def data_type_correction(bronze_df):
    
    bronze_df = bronze_df.withColumn("created_at", bronze_df["created_at"].cast(DoubleType()))\
                        .withColumn("updated_at", bronze_df["updated_at"].cast(DoubleType()))\
                        .withColumn("Postal_Code", bronze_df["Postal_code"].cast(DoubleType()))\
                        .withColumn("Model_year", bronze_df["Model_year"].cast(DoubleType()))\
                        .withColumn("2020_Census_Tract", bronze_df["2020_Census_Tract"].cast(DoubleType()))\
                        .withColumn("Counties", bronze_df["Counties"].cast(DoubleType()))\
                        .withColumn("Electric_Range", bronze_df["Electric_Range"].cast(DoubleType()))\
                        .withColumn("Base_MSRP", bronze_df["Base_MSRP"].cast(DoubleType()))\
                        .withColumn("Legislative_District", bronze_df["Legislative_District"].cast(DoubleType()))\
                        .withColumn("DOL_Vehicle_ID", bronze_df["DOL_Vehicle_ID"].cast(DoubleType()))\
                        .withColumn("Congressional_Districts", bronze_df["Congressional_Districts"].cast(DoubleType()))\
                        .withColumn("WAOFM_GIS_Legislative_District_Boundary", bronze_df["WAOFM_GIS_Legislative_District_Boundary"].cast(DoubleType()))
    bronze_df = bronze_df.withColumn("created_at",to_timestamp(from_unixtime(col("created_at"))))\
                           .withColumn("updated_at",to_timestamp(from_unixtime(col("updated_at"))))

    return bronze_df

In [89]:
def us_state_validation(bronze_df):

    valid_states = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 
                'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 
                'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 
                'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']
    bronze_df = bronze_df.withColumn("err_code", when(~col("State").isin(valid_states), concat(col("err_code"), lit("DQ01"))).otherwise(col("err_code")))\
    .withColumn("err_message", when(~col("State").isin(valid_states), concat(col("err_message"), lit("Invalid Sate"))).otherwise(col("err_message")))
    bronze_df = bronze_df.withColumn("err_flg", when(col("err_code") != "", "Y").otherwise("N"))
    return bronze_df

In [90]:
def null_check(bronze_df):
    # List of columns to check for null values, excluding the specified ones
    columns = [col_name for col_name in bronze_df.columns if col_name not in ["err_code", "err_message", "updated_meta", "created_meta"]]

    # Initialize null check for all columns together, so we only add error once
    null_condition = None

    for column in columns:
        # Create a condition to check if any of the columns has a null value
        column_null_condition = col(column).isNull()
        if null_condition is None:
            null_condition = column_null_condition
        else:
            null_condition = null_condition | column_null_condition

    # Apply the error code and message only once if any column has a null value
    bronze_df = bronze_df.withColumn(
        "err_code",
        when((instr(col("err_code"), "DQ02") == 0) & null_condition, 
             concat(col("err_code"), lit("DQ02 "))
        ).otherwise(col("err_code"))
    ).withColumn(
        "err_message",
        when((instr(col("err_message"), "Null value") == 0) & null_condition, 
             concat(col("err_message"), lit("Null value "))
        ).otherwise(col("err_message"))
    )
    
    # Adding a final flag to indicate if there are any errors
    bronze_df = bronze_df.withColumn("err_flg", when(col("err_code") != "", lit("Y")).otherwise(lit("N")))

    return bronze_df


In [91]:
def numeric_check(bronze_df):
    # List of numeric columns only
    numeric_columns = [field.name for field in bronze_df.schema.fields if field.dataType.simpleString() in ['int', 'double', 'float', 'long', 'decimal']]

    # Initialize numeric check for all columns together, so we only add error once
    invalid_numeric_condition = None

    for column in numeric_columns:
        # Create a condition to check if any of the columns has an invalid numeric value (NaN)
        column_invalid_condition = isnan(col(column))  # Check only for NaN (exclude nulls)

        # Combine conditions across all numeric columns
        if invalid_numeric_condition is None:
            invalid_numeric_condition = column_invalid_condition
        else:
            invalid_numeric_condition = invalid_numeric_condition | column_invalid_condition

    # Apply the error code and message only once if any column has an invalid numeric value
    bronze_df = bronze_df.withColumn(
        "err_code",
        when((instr(col("err_code"), "DQ04") == 0) & invalid_numeric_condition, 
             concat(col("err_code"), lit("DQ04 "))
        ).otherwise(col("err_code"))
    ).withColumn(
        "err_message",
        when((instr(col("err_message"), "Invalid Numeric Value") == 0) & invalid_numeric_condition, 
             concat(col("err_message"), lit("Invalid Numeric Value "))
        ).otherwise(col("err_message"))
    )

    # Adding a final flag to indicate if there are any errors
    bronze_df = bronze_df.withColumn("err_flg", when(col("err_code") != "", lit("Y")).otherwise(lit("N")))

    return bronze_df

In [92]:
def year_check(bronze_df):
    current_year = datetime.now().year
    
    # Corrected the parentheses to ensure correct logical operation order
    bronze_df = bronze_df.withColumn(
        "err_code", 
        when((col("Model_year") <= current_year) & (col("Model_year") < 1900),
             concat(col("err_code"), lit("DQ03 "))
        ).otherwise(col("err_code"))
    )
    
    bronze_df = bronze_df.withColumn(
        "err_message", 
        when((col("Model_year") <= current_year) & (col("Model_year") < 1900),
             concat(col("err_message"), lit("Invalid Model Year "))
        ).otherwise(col("err_message"))
    )

    bronze_df = bronze_df.withColumn("err_flg", when(col("err_code") != "", lit("Y")).otherwise(lit("N")))
    
    return bronze_df

In [93]:
from pyspark.sql.functions import col, when, concat, lit, instr, isnan, to_timestamp, from_unixtime
from pyspark.sql.types import TimestampType, DoubleType
from datetime import datetime

source_database_name = "bronze"
source_table_name = "vehicle_data" 
bronze_df = spark.sql(f"SELECT * FROM {source_database_name}.{source_table_name}")
bronze_df = bronze_df.withColumn("err_code", lit("")).withColumn("err_message", lit(""))
bronze_df = data_type_correction(bronze_df)
bronze_df = null_check(bronze_df)
bronze_df = year_check(bronze_df)
bronze_df = numeric_check(bronze_df)
# Create the table by writing data to it
bronze_df.write.format("delta").mode("overwrite").saveAsTable("silver.vehicle_data")
bronze_df.limit(10).show()

+------------------+--------------------+--------+-------------------+------------+-------------------+------------+----+----------+-----------+-----------+-----+-----------+----------+---------+-------+---------------------+-----------------------------------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+--------+-----------------------+---------------------------------------+--------+-----------+-------+
|               sid|                  id|position|         created_at|created_meta|         updated_at|updated_meta|meta|  VIN_1_10|     County|       City|State|Postal_Code|Model_year|     Make|  Model|Electric_Vehicle_Type|Clean_Alternative_Fuel_Vehicle_CAFV_Eligibility|Electric_Range|Base_MSRP|Legislative_District|DOL_Vehicle_ID|    Vehicle_Location|    Electric_Utility|2020_Census_Tract|Counties|Congressional_Districts|WAOFM_GIS_Legislative_District_Boundary|err_code|err_message|err_flg|
+-----

In [94]:
update_audit_table("EV_Data", progress="Ingest_Silver", status="Completed", end=True)

+----------+-------------+-----------+--------------------+--------------------+
|Table_Name|     Progress|     Status|          Start_time|            End_time|
+----------+-------------+-----------+--------------------+--------------------+
|   EV_Data|Ingest_Bronze|  Completed|2024-12-05 19:14:...|2024-12-05 19:14:...|
|   EV_Data|Ingest_Silver|In_Progress|2024-12-05 19:14:...|                null|
+----------+-------------+-----------+--------------------+--------------------+

+----------+-------------+-----------+--------------------+--------+
|Table_Name|     Progress|     Status|          Start_time|End_time|
+----------+-------------+-----------+--------------------+--------+
|   EV_Data|Ingest_Silver|In_Progress|2024-12-05 19:14:...|    null|
+----------+-------------+-----------+--------------------+--------+

+----------+-------------+---------+--------------------+--------------------+
|Table_Name|     Progress|   Status|          Start_time|            End_time|
+--------