In [1]:
import pyspark
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


# Replace these values with your machine's specifications
total_memory = "32g"  # Example total memory, adjust as needed
total_cores = 16  # Example total cores, adjust as needed

# spark =( 
#     SparkSession.builder \
#         .appName("Medallion Architecture") \
#         .config("spark.executor.memory", "30g")  # Set executor memory slightly less than total to avoid overhead issues
#         .config("spark.driver.memory", "30g")  # Set driver memory slightly less than total to avoid overhead issues
#         .config("spark.executor.instances", "1")  # Number of executors; setting to 1 if all resources are on a single machine
#         .config("spark.executor.cores", str(total_cores - 1))  # Leave 1 core for the OS
#         .config("spark.sql.shuffle.partitions", str(total_cores * 2))  # Shuffle partitions can be a multiple of cores
#         .getOrCreate()
#     )

spark = (
    SparkSession.builder
        .appName("Medallion Architecture")
        .config("spark.executor.memory", "24g")
        .config("spark.driver.memory", "24g")
        .config("spark.executor.instances", "1")
        .config("spark.executor.cores", str(total_cores - 1))
        .config("spark.sql.shuffle.partitions", str(total_cores * 2))
        .config("spark.memory.fraction", "0.8")  # Adjust the fraction of heap space used for execution and storage
        .config("spark.memory.storageFraction", "0.3")  # Adjust the fraction of execution memory used for storage
        .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/25 17:08:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Read the CSV / parquet file from the Volumes:

- a. Use PySpark to read the dataset.
- b. Explore the dataset and understand the schema and data types.
- c. Implement schema checks to ensure the integrity of the input data.

In [2]:
raw_path = os.path.join("..","data","*.csv")

df = spark.read.csv(raw_path, header=True, inferSchema=True)
df.printSchema()
df.show(5)

                                                                                

root
 |-- YEAR: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- ETHNIC: integer (nullable = true)
 |-- RACE: integer (nullable = true)
 |-- GENDER: integer (nullable = true)
 |-- SPHSERVICE: integer (nullable = true)
 |-- CMPSERVICE: integer (nullable = true)
 |-- OPISERVICE: integer (nullable = true)
 |-- RTCSERVICE: integer (nullable = true)
 |-- IJSSERVICE: integer (nullable = true)
 |-- MH1: integer (nullable = true)
 |-- MH2: integer (nullable = true)
 |-- MH3: integer (nullable = true)
 |-- SUB: integer (nullable = true)
 |-- MARSTAT: integer (nullable = true)
 |-- SMISED: integer (nullable = true)
 |-- SAP: integer (nullable = true)
 |-- EMPLOY: integer (nullable = true)
 |-- DETNLF: integer (nullable = true)
 |-- VETERAN: integer (nullable = true)
 |-- LIVARAG: integer (nullable = true)
 |-- NUMMHS: integer (nullable = true)
 |-- TRAUSTREFLG: integer (nullable = true)
 |-- ANXIETYFLG: integer (nullable = true)
 |-- ADHDFLG

24/05/25 17:09:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+---+----+------+----+------+----------+----------+----------+----------+----------+---+---+---+---+-------+------+---+------+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+
|YEAR|AGE|EDUC|ETHNIC|RACE|GENDER|SPHSERVICE|CMPSERVICE|OPISERVICE|RTCSERVICE|IJSSERVICE|MH1|MH2|MH3|SUB|MARSTAT|SMISED|SAP|EMPLOY|DETNLF|VETERAN|LIVARAG|NUMMHS|TRAUSTREFLG|ANXIETYFLG|ADHDFLG|CONDUCTFLG|DELIRDEMFLG|BIPOLARFLG|DEPRESSFLG|ODDFLG|PDDFLG|PERSONFLG|SCHIZOFLG|ALCSUBFLG|OTHERDISFLG|STATEFIP|DIVISION|REGION|     CASEID|
+----+---+----+------+----+------+----------+----------+----------+----------+----------+---+---+---+---+-------+------+---+------+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+
|2021| 

In [26]:
type(df)

pyspark.sql.dataframe.DataFrame

In [3]:
df.show()
df.describe().show()

+----+---+----+------+----+------+----------+----------+----------+----------+----------+---+---+---+---+-------+------+---+------+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+
|YEAR|AGE|EDUC|ETHNIC|RACE|GENDER|SPHSERVICE|CMPSERVICE|OPISERVICE|RTCSERVICE|IJSSERVICE|MH1|MH2|MH3|SUB|MARSTAT|SMISED|SAP|EMPLOY|DETNLF|VETERAN|LIVARAG|NUMMHS|TRAUSTREFLG|ANXIETYFLG|ADHDFLG|CONDUCTFLG|DELIRDEMFLG|BIPOLARFLG|DEPRESSFLG|ODDFLG|PDDFLG|PERSONFLG|SCHIZOFLG|ALCSUBFLG|OTHERDISFLG|STATEFIP|DIVISION|REGION|     CASEID|
+----+---+----+------+----+------+----------+----------+----------+----------+----------+---+---+---+---+-------+------+---+------+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+
|2021| 

[Stage 6:>                                                          (0 + 1) / 1]

+-------+--------------------+------------------+-------------------+-----------------+-----------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+--------------------+
|summary|                YEAR|               AGE|               EDUC|           ETHNIC|             RACE|            GENDER|         SPHSERVICE|         CMPSERVICE|         OPISERVICE|         RTCSERVICE|   

                                                                                

Bronze Layer
Staging (Bronze) Layer:
- i. Create a bronze table to store the raw data from the CSV file.
- ii. Perform basic data validation and cleaning, such as handling null values, data types, and column names.
- iii. Partition the bronze table by an appropriate column(s) to improve query performance.
- iv. Prepare to discuss how you would handle logging and exception handling to capture any issues during the bronze layer processing

In [4]:
bronze_path = os.path.join("..","data","medallion_layers","bronze")
df.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(bronze_path)

                                                                                

In [5]:
# bronze_cleaned_path = os.path.join("..","data","medallion_layers","bronze_cleaned")
bronze_df = spark.read.parquet(bronze_path, header=True, inferSchema=True)
cleaned_bronze_df = bronze_df.na.drop()
cleaned_bronze_df.show(5)
cleaned_bronze_df.printSchema()

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+
|YEAR|AGE|EDUC|SPHSERVICE|CMPSERVICE|OPISERVICE|RTCSERVICE|IJSSERVICE|MH1|MH2|MH3|SUB|SMISED|SAP|DETNLF|VETERAN|LIVARAG|NUMMHS|TRAUSTREFLG|ANXIETYFLG|ADHDFLG|CONDUCTFLG|DELIRDEMFLG|BIPOLARFLG|DEPRESSFLG|ODDFLG|PDDFLG|PERSONFLG|SCHIZOFLG|ALCSUBFLG|OTHERDISFLG|STATEFIP|DIVISION|REGION|     CASEID|GENDER|RACE|ETHNIC|MARSTAT|EMPLOY|
+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+
|2021| 

In [6]:
# cleaned_bronze_df.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(bronze_cleaned_path)

Silver Layer
- i. Create a silver table to store the transformed and enriched data.
- ii. Perform more complex transformations (See Transformation Exercise
for Senior Data Engineer for details), such as data aggregations, and data quality checks.
- iii. Partition the silver table by an appropriate column(s) to improve query performance.
- iv. Implement schema checks to ensure the transformed data matches the expected schema.
- v. Prepare to discuss how you would handle monitoring to track the success and failure of the silver layer processing.

In [7]:

# bronze_cleaned_path = os.path.join("..","data","medallion_layers","bronze_cleaned")

# cleaned_bronze_df = spark.read.parquet(bronze_cleaned_path)

cleaned_bronze_df = cleaned_bronze_df

cleaned_bronze_df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- SPHSERVICE: integer (nullable = true)
 |-- CMPSERVICE: integer (nullable = true)
 |-- OPISERVICE: integer (nullable = true)
 |-- RTCSERVICE: integer (nullable = true)
 |-- IJSSERVICE: integer (nullable = true)
 |-- MH1: integer (nullable = true)
 |-- MH2: integer (nullable = true)
 |-- MH3: integer (nullable = true)
 |-- SUB: integer (nullable = true)
 |-- SMISED: integer (nullable = true)
 |-- SAP: integer (nullable = true)
 |-- DETNLF: integer (nullable = true)
 |-- VETERAN: integer (nullable = true)
 |-- LIVARAG: integer (nullable = true)
 |-- NUMMHS: integer (nullable = true)
 |-- TRAUSTREFLG: integer (nullable = true)
 |-- ANXIETYFLG: integer (nullable = true)
 |-- ADHDFLG: integer (nullable = true)
 |-- CONDUCTFLG: integer (nullable = true)
 |-- DELIRDEMFLG: integer (nullable = true)
 |-- BIPOLARFLG: integer (nullable = true)
 |-- DEPRESSFLG: integer (nullable =

1. Data Type Conversion
    - a. Convert the categorical variables (GENDER, RACE, ETHNICITY, MARITAL, EMPLOY, INCOME) from numeric values to appropriate string or categorical data types.
    
    - b. Ensure that the CASEID variable is stored as an integer data type.
    
    - c. Convert the numeric variables (MENHLTH, PHYHLTH, POORHLTH) to float data types.
    
    - d. Validate the data types for all variables to ensure they are consistent with the expected formats.
    
    - e. Provide a summary of the data type conversions performed.

In [8]:
gender = {
    1: "Male",
    2: "Female",
    -9: "Missing/unknown/not collected/invalid"
}

race = {
    1: "American Indian/Alaska Native",
    2: "Asian",
    3: "Black or African American",
    4: "Native Hawaiian or Other Pacific Islander",
    5: "White",
    6: "Some other race alone/two or more races",
    -9: "Missing/unknown/not collected/invalid"
}

marital_status = {
    1: "Never married",
    2: "Now married",
    3: "Separated",
    4: "Divorced, widowed",
    -9: "Missing/unknown/not collected/invalid"
}

employment_status = {
    1: "Full-time",
    2: "Part-time",
    3: "Employed full-time/part-time not differentiated",
    4: "Unemployed",
    5: "Not in labor force",
    -9: "Missing/unknown/not collected/invalid"
}

ethnicity = {
    1: "Mexican",
    2: "Puerto Rican",
    3: "Other Hispanic or Latino origin",
    4: "Not of Hispanic or Latino origin",
    -9: "Missing/unknown/not collected/invalid"
}

# bronze_cleaned_path = os.path.join("..","data","medallion_layers","bronze_cleaned")

# cleaned_bronze_df = spark.read.parquet(bronze_cleaned_path)

# cleaned_bronze_df.printSchema()

In [9]:
# cleaned_bronze_df.withColumn()

In [10]:
def create_map_udf(mapping_dict):
    return F.udf(lambda key: mapping_dict.get(key, "Unknown"), T.StringType())

race_map_udf = create_map_udf(race)
gender_map_udf = create_map_udf(gender)
marital_status_map_udf = create_map_udf(marital_status)
employment_status_map_udf = create_map_udf(employment_status)
ethnicity_map_udf = create_map_udf(ethnicity)


silver_df = cleaned_bronze_df
silver_df = silver_df.withColumn("GENDER_mapped", gender_map_udf("GENDER"))
silver_df = silver_df.withColumn("RACE_mapped", race_map_udf("RACE"))
silver_df = silver_df.withColumn("MARSTAT_mapped", marital_status_map_udf("MARSTAT"))
silver_df = silver_df.withColumn("EMPLOY_mapped", employment_status_map_udf("EMPLOY"))
silver_df = silver_df.withColumn("ETHNIC_mapped", ethnicity_map_udf("ETHNIC"))


silver_df = silver_df.withColumn("CASEID_int", F.col("CASEID").cast(T.IntegerType()))

numeric_columns = [
    "NUMMHS","TRAUSTREFLG","ANXIETYFLG","ADHDFLG","CONDUCTFLG","DELIRDEMFLG","BIPOLARFLG","DEPRESSFLG","ODDFLG","PDDFLG","PERSONFLG","SCHIZOFLG",
    "ALCSUBFLG","OTHERDISFLG"
    ]
for column in numeric_columns:
    silver_df = silver_df.withColumn(column, F.col(column).cast(T.FloatType()))

silver_df_part_1 = silver_df
silver_df_part_1.printSchema()

data_type_summary = {
    "GENDER": "Converted to string/categorical using predefined mapping",
    "RACE": "Converted to string/categorical using predefined mapping",
    "ETHNIC": "Converted to string/categorical using predefined mapping",
    "MARSTAT": "Converted to string/categorical using predefined mapping",
    "EMPLOY": "Converted to string/categorical using predefined mapping",
    "CASEID": "Converted to integer",
    "NUMMHS": "Converted to float",
    "TRAUSTREFLG": "Converted to float",
    "ANXIETYFLG": "Converted to float",
    "ADHDFLG": "Converted to float",
    "CONDUCTFLG": "Converted to float",
    "DELIRDEMFLG": "Converted to float",
    "BIPOLARFLG": "Converted to float",
    "DEPRESSFLG": "Converted to float",
    "ODDFLG": "Converted to float",
    "PDDFLG": "Converted to float",
    "PERSONFLG": "Converted to float",
    "SCHIZOFLG": "Converted to float",
    "ALCSUBFLG": "Converted to float",
    "OTHERDISFLG": "Converted to float"
}

for column, summary in data_type_summary.items():
    print(f"{column}: {summary}")

silver_part_1_path = os.path.join("..","data","medallion_layers","silver_part_1")

# silver_df_part_1.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(silver_part_1_path)

root
 |-- YEAR: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- SPHSERVICE: integer (nullable = true)
 |-- CMPSERVICE: integer (nullable = true)
 |-- OPISERVICE: integer (nullable = true)
 |-- RTCSERVICE: integer (nullable = true)
 |-- IJSSERVICE: integer (nullable = true)
 |-- MH1: integer (nullable = true)
 |-- MH2: integer (nullable = true)
 |-- MH3: integer (nullable = true)
 |-- SUB: integer (nullable = true)
 |-- SMISED: integer (nullable = true)
 |-- SAP: integer (nullable = true)
 |-- DETNLF: integer (nullable = true)
 |-- VETERAN: integer (nullable = true)
 |-- LIVARAG: integer (nullable = true)
 |-- NUMMHS: float (nullable = true)
 |-- TRAUSTREFLG: float (nullable = true)
 |-- ANXIETYFLG: float (nullable = true)
 |-- ADHDFLG: float (nullable = true)
 |-- CONDUCTFLG: float (nullable = true)
 |-- DELIRDEMFLG: float (nullable = true)
 |-- BIPOLARFLG: float (nullable = true)
 |-- DEPRESSFLG: float (nullable = true)
 |-- ODDF

In [11]:
silver_df_part_1.show()

[Stage 10:>                                                         (0 + 1) / 1]

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+-------------+-----------+--------------+--------------------+--------------------+-----------+
|YEAR|AGE|EDUC|SPHSERVICE|CMPSERVICE|OPISERVICE|RTCSERVICE|IJSSERVICE|MH1|MH2|MH3|SUB|SMISED|SAP|DETNLF|VETERAN|LIVARAG|NUMMHS|TRAUSTREFLG|ANXIETYFLG|ADHDFLG|CONDUCTFLG|DELIRDEMFLG|BIPOLARFLG|DEPRESSFLG|ODDFLG|PDDFLG|PERSONFLG|SCHIZOFLG|ALCSUBFLG|OTHERDISFLG|STATEFIP|DIVISION|REGION|     CASEID|GENDER|RACE|ETHNIC|MARSTAT|EMPLOY|GENDER_mapped|RACE_mapped|MARSTAT_mapped|       EMPLOY_mapped|       ETHNIC_mapped| CASEID_int|
+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+---------

                                                                                

In [12]:
silver_df_part_1 = silver_df_part_1.withColumn("CASEID_int", F.substring(F.col("CASEID").cast("string"), 5, 10).cast(T.IntegerType()))

silver_df_part_1.show()

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+-------------+-----------+--------------+--------------------+--------------------+----------+
|YEAR|AGE|EDUC|SPHSERVICE|CMPSERVICE|OPISERVICE|RTCSERVICE|IJSSERVICE|MH1|MH2|MH3|SUB|SMISED|SAP|DETNLF|VETERAN|LIVARAG|NUMMHS|TRAUSTREFLG|ANXIETYFLG|ADHDFLG|CONDUCTFLG|DELIRDEMFLG|BIPOLARFLG|DEPRESSFLG|ODDFLG|PDDFLG|PERSONFLG|SCHIZOFLG|ALCSUBFLG|OTHERDISFLG|STATEFIP|DIVISION|REGION|     CASEID|GENDER|RACE|ETHNIC|MARSTAT|EMPLOY|GENDER_mapped|RACE_mapped|MARSTAT_mapped|       EMPLOY_mapped|       ETHNIC_mapped|CASEID_int|
+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+

2. Data Normalization and Standardization
- a. Normalize the numeric variables (MENHLTH, PHYHLTH, POORHLTH) using min-max scaling to bring them to a common scale between 0 and 1.
- b. Standardize the numeric variables (MENHLTH, PHYHLTH, POORHLTH) using z-score normalization to have a mean of 0 and a standard deviation of 1.
- c. Ensure that the normalized and standardized variables are stored in new columns, keeping the original variables intact.
- d. Provide a summary of the normalization and standardization techniques used and the rationale behind them.

In [13]:
from pyspark.ml.feature import MinMaxScaler, StandardScaler, VectorAssembler

extract_first_element = F.udf(lambda x: float(x[0]), T.FloatType())

cols_to_scale = [
    "NUMMHS","TRAUSTREFLG","ANXIETYFLG","ADHDFLG","CONDUCTFLG","DELIRDEMFLG","BIPOLARFLG","DEPRESSFLG","ODDFLG","PDDFLG","PERSONFLG","SCHIZOFLG",
    "ALCSUBFLG","OTHERDISFLG"
    ]

for col in cols_to_scale:
    assembler = VectorAssembler(inputCols=[col], outputCol=f"{col}_vec")
    scaler = MinMaxScaler(inputCol=f"{col}_vec", outputCol=f"{col}_normalized_vec")
    silver_df_part_1 = assembler.transform(silver_df_part_1)
    scaler_model = scaler.fit(silver_df_part_1)
    silver_df_part_1 = scaler_model.transform(silver_df_part_1)
    silver_df_part_1 = silver_df_part_1.withColumn(f"{col}_normalized", extract_first_element(F.col(f"{col}_normalized_vec")))
    silver_df_part_1 = silver_df_part_1.drop(f"{col}_vec").drop(f"{col}_normalized_vec")

for col in cols_to_scale:
    assembler = VectorAssembler(inputCols=[col], outputCol=f"{col}_vec")
    scaler = StandardScaler(inputCol=f"{col}_vec", outputCol=f"{col}_standardized_vec", withMean=True, withStd=True)
    silver_df_part_1 = assembler.transform(silver_df_part_1)
    scaler_model = scaler.fit(silver_df_part_1)
    silver_df_part_1 = scaler_model.transform(silver_df_part_1)
    silver_df_part_1 = silver_df_part_1.withColumn(f"{col}_standardized", extract_first_element(F.col(f"{col}_standardized_vec")))
    silver_df_part_1 = silver_df_part_1.drop(f"{col}_vec").drop(f"{col}_standardized_vec")

silver_df_part_2 = silver_df_part_1

silver_df_part_2.na.drop()

silver_df_part_2.printSchema()

data_type_summary = {
    "MENHLTH_normalized": "Normalized to scale between 0 and 1 using Min-Max scaling",
    "PHYHLTH_normalized": "Normalized to scale between 0 and 1 using Min-Max scaling",
    "POORHLTH_normalized": "Normalized to scale between 0 and 1 using Min-Max scaling",
    "MENHLTH_standardized": "Standardized to have mean 0 and standard deviation 1 using Z-Score normalization",
    "PHYHLTH_standardized": "Standardized to have mean 0 and standard deviation 1 using Z-Score normalization",
    "POORHLTH_standardized": "Standardized to have mean 0 and standard deviation 1 using Z-Score normalization"
}

for column, summary in data_type_summary.items():
    print(f"{column}: {summary}")
silver_part_2_path = os.path.join("..","data","medallion_layers","silver_part_2")

# silver_df_part_2.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(silver_part_2_path)

                                                                                

root
 |-- YEAR: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- SPHSERVICE: integer (nullable = true)
 |-- CMPSERVICE: integer (nullable = true)
 |-- OPISERVICE: integer (nullable = true)
 |-- RTCSERVICE: integer (nullable = true)
 |-- IJSSERVICE: integer (nullable = true)
 |-- MH1: integer (nullable = true)
 |-- MH2: integer (nullable = true)
 |-- MH3: integer (nullable = true)
 |-- SUB: integer (nullable = true)
 |-- SMISED: integer (nullable = true)
 |-- SAP: integer (nullable = true)
 |-- DETNLF: integer (nullable = true)
 |-- VETERAN: integer (nullable = true)
 |-- LIVARAG: integer (nullable = true)
 |-- NUMMHS: float (nullable = true)
 |-- TRAUSTREFLG: float (nullable = true)
 |-- ANXIETYFLG: float (nullable = true)
 |-- ADHDFLG: float (nullable = true)
 |-- CONDUCTFLG: float (nullable = true)
 |-- DELIRDEMFLG: float (nullable = true)
 |-- BIPOLARFLG: float (nullable = true)
 |-- DEPRESSFLG: float (nullable = true)
 |-- ODDF

In [14]:
silver_df_part_2.show()

24/05/25 17:15:27 WARN DAGScheduler: Broadcasting large task binary with size 1079.2 KiB
[Stage 96:>                                                         (0 + 1) / 1]

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+-------------+-----------+--------------+--------------------+--------------------+----------+-----------------+----------------------+---------------------+------------------+---------------------+----------------------+---------------------+---------------------+-----------------+-----------------+--------------------+--------------------+--------------------+----------------------+-------------------+------------------------+-----------------------+--------------------+-----------------------+------------------------+-----------------------+-----------------------+-------------------+-------------------+----------------------+----------------------+----------

24/05/25 17:15:37 WARN PythonUDFRunner: Detected deadlock while completing task 0.0 in stage 96 (TID 9608): Attempting to kill Python Worker
                                                                                

3. Data Partitioning and Sampling
- a. Split the dataset into training and testing sets, ensuring a representative split based on the demographic variables (GENDER, RACE, ETHNICITY, MARITAL, EMPLOY, INCOME).
- b. Implement stratified sampling to create the training and testing sets, maintaining the same proportions of the demographic variables in both sets.
- c. Create a validation set from the training set using a similar stratified sampling approach.
- d. Ensure that the dataset splits are stored in separate files or data structures, with appropriate naming conventions and metadata.
- e. Provide a detailed report on the data partitioning and sampling process, including the rationale for the chosen techniques and the characteristics of the resulting datasets.

In [15]:
SEED = 42
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.classification import LogisticRegression

# Assume silver_df is your DataFrame

# "GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY"
silver_df_part_3 = silver_df_part_2.withColumn("demographic_strata", F.concat_ws("_", F.col("GENDER"), F.col("RACE"), F.col("ETHNIC"), F.col("MARSTAT"), F.col("EMPLOY")))

indexer = StringIndexer(inputCol="demographic_strata", outputCol="strataIndex")
silver_df_part_3 = indexer.fit(silver_df_part_3).transform(silver_df_part_3)

train, test = silver_df_part_3.randomSplit([0.8, 0.2], seed=SEED)
train, validation = train.randomSplit([0.75, 0.25], seed=SEED)


full_path = os.path.join("..","data","medallion_layers","silver_part_3","full")
train_path = os.path.join("..","data","medallion_layers","silver_part_3","train")
test_path = os.path.join("..","data","medallion_layers","silver_part_3","test")
validation_path = os.path.join("..","data","medallion_layers","silver_part_3","validation")

                                                                                

In [16]:
# silver_df_part_3.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(full_path)
# train.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(train_path)
# test.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(test_path)
# validation.write.partitionBy("GENDER", "RACE", "ETHNIC", "MARSTAT", "EMPLOY").mode("overwrite").parquet(validation_path)

In [17]:
silver_df_part_3.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- SPHSERVICE: integer (nullable = true)
 |-- CMPSERVICE: integer (nullable = true)
 |-- OPISERVICE: integer (nullable = true)
 |-- RTCSERVICE: integer (nullable = true)
 |-- IJSSERVICE: integer (nullable = true)
 |-- MH1: integer (nullable = true)
 |-- MH2: integer (nullable = true)
 |-- MH3: integer (nullable = true)
 |-- SUB: integer (nullable = true)
 |-- SMISED: integer (nullable = true)
 |-- SAP: integer (nullable = true)
 |-- DETNLF: integer (nullable = true)
 |-- VETERAN: integer (nullable = true)
 |-- LIVARAG: integer (nullable = true)
 |-- NUMMHS: float (nullable = true)
 |-- TRAUSTREFLG: float (nullable = true)
 |-- ANXIETYFLG: float (nullable = true)
 |-- ADHDFLG: float (nullable = true)
 |-- CONDUCTFLG: float (nullable = true)
 |-- DELIRDEMFLG: float (nullable = true)
 |-- BIPOLARFLG: float (nullable = true)
 |-- DEPRESSFLG: float (nullable = true)
 |-- ODDF

In [18]:
silver_df_part_3.show()

24/05/25 17:15:49 WARN DAGScheduler: Broadcasting large task binary with size 1073.8 KiB
24/05/25 17:17:03 WARN PythonUDFRunner: Detected deadlock while completing task 0.0 in stage 100 (TID 9951): Attempting to kill Python Worker
24/05/25 17:17:03 WARN PythonUDFRunner: Detected deadlock while completing task 0.0 in stage 100 (TID 9951): Attempting to kill Python Worker
                                                                                

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+-------------+-----------+--------------+--------------------+--------------------+----------+-----------------+----------------------+---------------------+------------------+---------------------+----------------------+---------------------+---------------------+-----------------+-----------------+--------------------+--------------------+--------------------+----------------------+-------------------+------------------------+-----------------------+--------------------+-----------------------+------------------------+-----------------------+-----------------------+-------------------+-------------------+----------------------+----------------------+----------

In [19]:
train.show()

24/05/25 17:17:05 WARN DAGScheduler: Broadcasting large task binary with size 1136.7 KiB
                                                                                

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+-------------+-----------+--------------------+--------------------+--------------------+----------+-----------------+----------------------+---------------------+------------------+---------------------+----------------------+---------------------+---------------------+-----------------+-----------------+--------------------+--------------------+--------------------+----------------------+-------------------+------------------------+-----------------------+--------------------+-----------------------+------------------------+-----------------------+-----------------------+-------------------+-------------------+----------------------+----------------------+----

In [20]:
validation.show()

24/05/25 17:18:46 WARN DAGScheduler: Broadcasting large task binary with size 1136.7 KiB
                                                                                

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+-------------+-----------+--------------------+--------------------+--------------------+----------+-----------------+----------------------+---------------------+------------------+---------------------+----------------------+---------------------+---------------------+-----------------+-----------------+--------------------+--------------------+--------------------+----------------------+-------------------+------------------------+-----------------------+--------------------+-----------------------+------------------------+-----------------------+-----------------------+-------------------+-------------------+----------------------+----------------------+----

In [21]:
test.show()

24/05/25 17:20:45 WARN DAGScheduler: Broadcasting large task binary with size 1112.3 KiB
[Stage 103:>                                                        (0 + 1) / 1]

+----+---+----+----------+----------+----------+----------+----------+---+---+---+---+------+---+------+-------+-------+------+-----------+----------+-------+----------+-----------+----------+----------+------+------+---------+---------+---------+-----------+--------+--------+------+-----------+------+----+------+-------+------+-------------+-----------+--------------------+--------------------+--------------------+----------+-----------------+----------------------+---------------------+------------------+---------------------+----------------------+---------------------+---------------------+-----------------+-----------------+--------------------+--------------------+--------------------+----------------------+-------------------+------------------------+-----------------------+--------------------+-----------------------+------------------------+-----------------------+-----------------------+-------------------+-------------------+----------------------+----------------------+----

                                                                                

Business (Gold) Layer
- i. Create a gold table to store the final, curated data.
- ii. Perform additional transformations and calculations to create
business-ready datasets.
- iii. Partition the gold table by an appropriate column(s) to improve query performance.
- iv. Prepare to discuss how you would handle schema checks to ensure the
final data meets the expected schema.
- v. Prepare to discuss how you would handle monitoring to track the success
and failure of the gold layer processing

In [22]:


gold_df = silver_df_part_3




In [23]:
gold_df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- SPHSERVICE: integer (nullable = true)
 |-- CMPSERVICE: integer (nullable = true)
 |-- OPISERVICE: integer (nullable = true)
 |-- RTCSERVICE: integer (nullable = true)
 |-- IJSSERVICE: integer (nullable = true)
 |-- MH1: integer (nullable = true)
 |-- MH2: integer (nullable = true)
 |-- MH3: integer (nullable = true)
 |-- SUB: integer (nullable = true)
 |-- SMISED: integer (nullable = true)
 |-- SAP: integer (nullable = true)
 |-- DETNLF: integer (nullable = true)
 |-- VETERAN: integer (nullable = true)
 |-- LIVARAG: integer (nullable = true)
 |-- NUMMHS: float (nullable = true)
 |-- TRAUSTREFLG: float (nullable = true)
 |-- ANXIETYFLG: float (nullable = true)
 |-- ADHDFLG: float (nullable = true)
 |-- CONDUCTFLG: float (nullable = true)
 |-- DELIRDEMFLG: float (nullable = true)
 |-- BIPOLARFLG: float (nullable = true)
 |-- DEPRESSFLG: float (nullable = true)
 |-- ODDF

In [24]:
expected_schema = T.StructType([
    T.StructField("YEAR", T.IntegerType(), True),
    T.StructField("AGE", T.IntegerType(), True),
    T.StructField("EDUC", T.IntegerType(), True),
    T.StructField("SPHSERVICE", T.IntegerType(), True),
    T.StructField("CMPSERVICE", T.IntegerType(), True),
    T.StructField("OPISERVICE", T.IntegerType(), True),
    T.StructField("RTCSERVICE", T.IntegerType(), True),
    T.StructField("IJSSERVICE", T.IntegerType(), True),
    T.StructField("MH1", T.IntegerType(), True),
    T.StructField("MH2", T.IntegerType(), True),
    T.StructField("MH3", T.IntegerType(), True),
    T.StructField("SUB", T.IntegerType(), True),
    T.StructField("SMISED", T.IntegerType(), True),
    T.StructField("SAP", T.IntegerType(), True),
    T.StructField("DETNLF", T.IntegerType(), True),
    T.StructField("VETERAN", T.IntegerType(), True),
    T.StructField("LIVARAG", T.IntegerType(), True),
    T.StructField("NUMMHS", T.FloatType(), True),
    T.StructField("TRAUSTREFLG", T.FloatType(), True),
    T.StructField("ANXIETYFLG", T.FloatType(), True),
    T.StructField("ADHDFLG", T.FloatType(), True),
    T.StructField("CONDUCTFLG", T.FloatType(), True),
    T.StructField("DELIRDEMFLG", T.FloatType(), True),
    T.StructField("BIPOLARFLG", T.FloatType(), True),
    T.StructField("DEPRESSFLG", T.FloatType(), True),
    T.StructField("ODDFLG", T.FloatType(), True),
    T.StructField("PDDFLG", T.FloatType(), True),
    T.StructField("PERSONFLG", T.FloatType(), True),
    T.StructField("SCHIZOFLG", T.FloatType(), True),
    T.StructField("ALCSUBFLG", T.FloatType(), True),
    T.StructField("OTHERDISFLG", T.FloatType(), True),
    T.StructField("STATEFIP", T.IntegerType(), True),
    T.StructField("DIVISION", T.IntegerType(), True),
    T.StructField("REGION", T.IntegerType(), True),
    T.StructField("CASEID", T.LongType(), True),
    T.StructField("GENDER", T.IntegerType(), True),
    T.StructField("RACE", T.IntegerType(), True),
    T.StructField("ETHNIC", T.IntegerType(), True),
    T.StructField("MARSTAT", T.IntegerType(), True),
    T.StructField("EMPLOY", T.IntegerType(), True),
    T.StructField("GENDER_mapped", T.StringType(), True),
    T.StructField("RACE_mapped", T.StringType(), True),
    T.StructField("MARSTAT_mapped", T.StringType(), True),
    T.StructField("EMPLOY_mapped", T.StringType(), True),
    T.StructField("ETHNIC_mapped", T.StringType(), True),
    T.StructField("CASEID_int", T.IntegerType(), True),
    T.StructField("NUMMHS_normalized", T.FloatType(), True),
    T.StructField("TRAUSTREFLG_normalized", T.FloatType(), True),
    T.StructField("ANXIETYFLG_normalized", T.FloatType(), True),
    T.StructField("ADHDFLG_normalized", T.FloatType(), True),
    T.StructField("CONDUCTFLG_normalized", T.FloatType(), True),
    T.StructField("DELIRDEMFLG_normalized", T.FloatType(), True),
    T.StructField("BIPOLARFLG_normalized", T.FloatType(), True),
    T.StructField("DEPRESSFLG_normalized", T.FloatType(), True),
    T.StructField("ODDFLG_normalized", T.FloatType(), True),
    T.StructField("PDDFLG_normalized", T.FloatType(), True),
    T.StructField("PERSONFLG_normalized", T.FloatType(), True),
    T.StructField("SCHIZOFLG_normalized", T.FloatType(), True),
    T.StructField("ALCSUBFLG_normalized", T.FloatType(), True),
    T.StructField("OTHERDISFLG_normalized", T.FloatType(), True),
    T.StructField("NUMMHS_standardized", T.FloatType(), True),
    T.StructField("TRAUSTREFLG_standardized", T.FloatType(), True),
    T.StructField("ANXIETYFLG_standardized", T.FloatType(), True),
    T.StructField("ADHDFLG_standardized", T.FloatType(), True),
    T.StructField("CONDUCTFLG_standardized", T.FloatType(), True),
    T.StructField("DELIRDEMFLG_standardized", T.FloatType(), True),
    T.StructField("BIPOLARFLG_standardized", T.FloatType(), True),
    T.StructField("DEPRESSFLG_standardized", T.FloatType(), True),
    T.StructField("ODDFLG_standardized", T.FloatType(), True),
    T.StructField("PDDFLG_standardized", T.FloatType(), True),
    T.StructField("PERSONFLG_standardized", T.FloatType(), True),
    T.StructField("SCHIZOFLG_standardized", T.FloatType(), True),
    T.StructField("ALCSUBFLG_standardized", T.FloatType(), True),
    T.StructField("OTHERDISFLG_standardized", T.FloatType(), True)
])

# gold_df = gold_df.withColumn("strataIndex", F.col("strataIndex").cast(T.DoubleType()).alias("strataIndex", metadata={"nullable": "false"}))

gold_df = gold_df.drop("demographic_strata")

gold_df = gold_df.drop("strataIndex")

def validate_schema(df, expected_schema):
    return df.schema == expected_schema

validate_schema(gold_df, expected_schema)

True

Write SQL Queries (Live Code):
- a. Write SQL queries to retrieve data from the bronze, silver, and gold tables.
- b. Demonstrate your understanding of the data and the transformations performed

Documentation and Presentation:
- a. Document in a Readme the entire process, including the data sources, transformations, and the rationale behind the Medallion Architecture implementation.
- b. Prepare to speak about the schema checks, monitoring, logging, and exception handling mechanisms implemented throughout the process.
- c. Please send a github-link to the notebook/code you used for the exercise & all documentation 24 hours before your interview.
- d. Document how we can run & test your code/notebook

In [2]:
spark.stop()

NameError: name 'spark' is not defined