
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img
    src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png"
    alt="Databricks Learning"
  >
</div>



# LAB - Build a  Feature Engineering Pipeline

Welcome to the "Build a Feature Engineering Pipeline" lab! In this hands-on session, we'll dive into the essential steps of creating a robust feature engineering pipeline. From data loading and preparation to fitting a pipeline and saving it for future use, this lab equips you with fundamental skills in crafting efficient and reproducible machine learning workflows. Let's embark on the journey of transforming raw data into meaningful features for predictive modeling.

**Lab Outline**

+ **Task 1:** Load Dataset and Data Preparation
  + **1.1.** Load Dataset
  + **1.2.** Data Preparation
+ **Task 2:** Split Dataset
+ **Task 3:** Create Pipeline for Data Imputation and Transformation
+ **Task 4:** Fit the Pipeline
+ **Task 5:** Show Transformation Results
+ **Task 6:** Save Pipeline




## REQUIRED - SELECT CLASSIC COMPUTE
Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:
1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

2. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

   - Click **More** in the drop-down.
   
   - In the **Attach to an existing compute resource** window, use the first drop-down to select your unique cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

2. Find the triangle icon to the right of your compute cluster name and click it.

3. Wait a few minutes for the cluster to start.

4. Once the cluster is running, complete the steps above to select your cluster.


## Requirements

Please review the following requirements before starting the lesson:

* To run this notebook, you need to use one of the following Databricks runtime(s): **17.3.x-cpu-ml-scala2.13**


## Lab Setup

Before starting the Lab, run the provided classroom setup script. This script will establish necessary configuration variables tailored to each user. Execute the following code cell:

In [0]:
%run ../Includes/Classroom-Setup-2.3

**Other Conventions:**

Throughout this demo, we'll refer to the object `DA`. This object, provided by Databricks Academy, contains variables such as your username, catalog name, schema name, working directory, and dataset locations. Run the code block below to view these details:

In [0]:
print(f"Username:          {DA.username}")
print(f"Catalog Name:      {DA.catalog_name}")
print(f"Schema Name:       {DA.schema_name}")
print(f"Working Directory: {DA.paths.working_dir}")
print(f"Dataset Location:  {DA.paths.datasets}")

## Task 1: Load Dataset and Data Preparation


**1.1. Load Dataset:**
+ Load a dataset with features that require imputation and transformation
+ Display basic information about the dataset (e.g., schema, summary statistics)

**1.2. Data Preparation:**

+ Examine the dataset.
+ Identify and discuss the features that need data preparation.
+ Convert data types: Demonstrate converting data types for selected columns (e.g., String to Int, Int to Boolean).
+ Remove a column: Discuss and remove a column with too many missing values.
+ Remove outliers: Implement a threshold-based approach to remove outlier records for a specific column.
+ Save cleaned dataset as "silver table."

**1.1. Load Dataset:**

+ Load a dataset with features that require imputation and transformation
+ Display basic information about the dataset (e.g., schema, summary statistics)


In [0]:
## Set the path of the dataset
dataset_path = f"{DA.paths.datasets.cdc_diabetes}/cdc-diabetes/diabetes_binary_5050_raw.csv"

## Read the CSV file using the Spark read.csv function
## Set the header parameter to True to indicate that the CSV file has a header
## Set the inferSchema option to True for Spark to automatically detect the data types
## Set the multiLine option to True to ensure that Spark reads multi-line fields properly
cdc_df = spark.read.option("nullValue", "null").csv(dataset_path, header="true", inferSchema="true", multiLine="true", escape='"')

## Display the resulting dataframe
display(cdc_df)

**1.2. Data Preparation:**

+ Examine the dataset.
+ Identify the features that need data preparation.
+ Convert data types: Demonstrate converting data types for selected columns (e.g., String to Int, Double to Boolean).

In [0]:
## Convert string columns to integer type
from pyspark.sql.types import IntegerType, BooleanType, StringType, DoubleType
from pyspark.sql.functions import col

## List of string columns to convert
string_columns = ["HighBP", "CholCheck", "PhysActivity"]

## Iterate over string columns and cast to integer type
for column in string_columns:
    cdc_df = cdc_df.withColumn(column, col(column).cast("int"))

## Convert double columns to BooleanType
double_columns = ["Diabetes_binary", "hvyalcoholconsump"]
for column in double_columns:
    cdc_df = cdc_df.withColumn(column, col(column).cast(BooleanType()))

## Print the schema
cdc_df.printSchema()
## Examine the printed schema to verify the changes.

+ **Remove a column with too many missing values.**

In [0]:
from pyspark.sql.functions import col, when, count, concat_ws, collect_list, length, trim, lower, sum

## First, get the count of missing values per column to create a singleton row DataFrame
missing_cdc_df = cdc_df.agg(*[sum(when(col(c).isNull() | (length(trim(col(c).cast("string"))) == 0) | lower(trim(col(c).cast("string"))).isin(["none", "null"]), 1, ).otherwise(0) ).alias(c) for c in cdc_df.columns ] )

## Define a helper function to transpose the DataFrame for better readability
def TransposeDF(df, columns, pivotCol):
    """Helper function to transpose Spark DataFrame"""
    columnsValue = list(map(lambda x: str("'") + str(x) + str("',") + str(x), columns))
    stackCols = ','.join(x for x in columnsValue)
    df_1 = df.selectExpr(pivotCol, "stack(" + str(len(columns)) + "," + stackCols + ")")\
              .select(pivotCol, "col0", "col1")
    final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))\
                   .withColumnRenamed("col0", pivotCol)
    return final_df

## Transpose the missing_cdc_df for better readability
missing_df_T = TransposeDF(spark.createDataFrame([{"Column": "Number of Missing Values"}]).join(missing_cdc_df), missing_cdc_df.columns, "Column")

## Display the count of missing values per column
display(missing_cdc_df)

## Set a threshold for missing data to drop columns
per_thresh = 0.6

## Calculate the total count of rows in the DataFrame
N = cdc_df.count()

## Identify columns with more than the specified percentage of missing data
to_drop_missing = [x.asDict()['Column'] for x in missing_df_T.select("Column").where(col("Number of Missing Values") / N >= per_thresh).collect()]

## Drop columns with more than 60% missing data
print(f"Dropping columns {to_drop_missing} with more than {per_thresh * 100}% missing data")
cdc_no_missing_df = cdc_df.drop(*to_drop_missing)

## Display the DataFrame after dropping columns with excessive missing data
display(cdc_no_missing_df)

+ **Remove outliers: Implement a threshold-based approach to remove outlier records for a specific column.**

In [0]:
## Define cutoff values
MentHlth_cutoff = 0  # Assuming MentHlth cannot be negative
BMI_cutoff = 50  # Reasonable upper limit for BMI

## Apply both filters in a single step
cdc_no_outliers_df = cdc_no_missing_df.filter(
    (col("MentHlth") >= MentHlth_cutoff) & (col("BMI") <= BMI_cutoff)
)

## Display the count before and after removing outliers
print(f"Count - Before: {cdc_no_missing_df.count()} / After: {cdc_no_outliers_df.count()}")

## Display the DataFrame after removing outliers
display(cdc_no_outliers_df)

+ **Save the cleaned dataset as the "silver table" for further analysis**

In [0]:
cdc_df_full = "cdc_df_full"

## Save as DELTA table (silver)
cdc_df_full_silver = f"{cdc_df_full}_silver"
cdc_no_outliers_df.write.mode("overwrite").option("mergeSchema", True).saveAsTable(cdc_df_full_silver)

## Display the resulting DataFrame (optional)
display(cdc_no_outliers_df)

## Task 2: Split Dataset

**2.1. Split Dataset:**

+ Split the cleaned dataset into training and testing sets in 80:20 ratio

In [0]:
## Split with 80 percent of the data in train_df and 20 percent of the data in test_df
train_df, test_df = cdc_no_outliers_df.randomSplit([.8, .2], seed=42)

## Materialize the split DataFrames as DELTA tables
train_df.write.mode("overwrite").option("overwriteSchema", True).saveAsTable(f"{DA.catalog_name}.{DA.schema_name}.cdc_df_train")
test_df.write.mode("overwrite").option("overwriteSchema", True).saveAsTable(f"{DA.catalog_name}.{DA.schema_name}.cdc_df_baseline")

In [0]:
from pyspark.ml.feature import StandardScaler, RobustScaler, VectorAssembler

## Assuming 'train_data' is your training set DataFrame
feature_columns = ["income"]  # Add your actual feature column names

## Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="income_NUM_Col_assembled")
train_assembled_df = assembler.transform(train_df.select(*feature_columns))
test_assembled_df = assembler.transform(test_df.select(*feature_columns))

## Define scaler and fit on the training set
scaler = RobustScaler(inputCol="income_NUM_Col_assembled", outputCol="income_NUM_Col_scaled")
scaler_fitted = scaler.fit(train_assembled_df)

## Apply to both training and test sets
train_scaled_df = scaler_fitted.transform(train_assembled_df)
test_scaled_df = scaler_fitted.transform(test_assembled_df)

## Display the resulting DataFrames
print("This is the Training set:")
train_scaled_df.show()
print("This is the Testing set:")
test_scaled_df.show()

## Task 3: Create Pipeline using Data Imputation and Transformation

**3.1. Create Pipeline:**

+ Create a pipeline with the following tasks:
  + StringIndexer
  + Imputer
  + Scaler
  + One-Hot Encoder


In [0]:
from pyspark.sql.types import IntegerType, BooleanType, StringType, DoubleType
from pyspark.sql.functions import col, count, when

## Get a list of integer & boolean columns
integer_cols = [column.name for column in train_df.schema.fields if (column.dataType == IntegerType() or column.dataType == BooleanType())]

## Loop through integer columns to cast each one to double
for column in integer_cols:
    train_df = train_df.withColumn(column, col(column).cast("double"))
    test_df = test_df.withColumn(column, col(column).cast("double"))

## Get a list of string, numeric columns
string_cols = [c.name for c in train_df.schema.fields if c.dataType == StringType()]
num_cols = [c.name for c in train_df.schema.fields if c.dataType == DoubleType()]

## Get a list of columns with missing values
## Numerical
num_missing_values_logic = [count(when(col(column).isNull(), column)).alias(column) for column in num_cols]
row_dict_num = train_df.select(num_missing_values_logic).first().asDict()
num_missing_cols = [column for column in row_dict_num if row_dict_num[column] > 0]

## String
string_missing_values_logic = [count(when(col(column).isNull(), column)).alias(column) for column in string_cols]
row_dict_string = train_df.select(string_missing_values_logic).first().asDict()
string_missing_cols = [column for column in row_dict_string if row_dict_string[column] > 0]

## Identify low and high cardinality columns
cardinality_threshold = 100
low_card_cols = []
high_card_cols = []

for col_name in string_cols:
    unique_count = train_df.select(col_name).distinct().count()
    if unique_count <= cardinality_threshold:
        low_card_cols.append(col_name)
    else:
        high_card_cols.append(col_name)

## Print columns with missing values and cardinality info
print(f"Numeric columns with missing values: {num_missing_cols}")
print(f"String columns with missing values: {string_missing_cols}")
print(f"Low-cardinality string columns (OHE allowed): {low_card_cols}")
print(f"High-cardinality string columns (excluded from OHE): {high_card_cols}")

In [0]:
## import required libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, RobustScaler, StringIndexer, OneHotEncoder

## String/Cat Indexer
## create an additional column to index low-cardinality string columns
## these columns will retain their original null values via 'handleInvalid="keep"'
string_cols_indexed = [c + '_index' for c in low_card_cols]
string_indexer = StringIndexer(inputCols=low_card_cols, outputCols=string_cols_indexed, handleInvalid="keep")

## Imputer (same strategy for all double/indexes)
## create a list of columns containing missing values
## utilize the mode strategy to impute all the missing columns
string_missing_cols_indexed = [c + '_index' for c in string_missing_cols if c in low_card_cols]
to_impute = num_missing_cols + string_missing_cols_indexed

imputer = Imputer(inputCols=to_impute, outputCols=to_impute, strategy='mode')

## Scale numerical
## create a vector of numerical columns as an array in the 'numerical_assembled' column
## robustly scale all the numerical_scaled values for this array in the 'numerical_scaled' column
numerical_assembler = VectorAssembler(inputCols=num_cols, outputCol="numerical_assembled")
numerical_scaler = RobustScaler(inputCol="numerical_assembled", outputCol="numerical_scaled")

## OHE categoricals
## create an OHE encoder to turn the indexed low-cardinality string columns into binary vectors
ohe_cols = [column + '_ohe' for column in low_card_cols]
one_hot_encoder = OneHotEncoder(inputCols=string_cols_indexed, outputCols=ohe_cols, handleInvalid="keep")

## Assembler (All)
## re-collect all columns and create a 'features' column from them
feature_cols = ["numerical_scaled"] + ohe_cols
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

## Instantiate the pipeline
## instantiate a pipeline with all the above stages
stages_list = [
    string_indexer,
    imputer,
    numerical_assembler,
    numerical_scaler,
    one_hot_encoder,
    vector_assembler
]

pipeline = Pipeline(stages=stages_list)

##Task 4: Fit the Pipeline
**4.1. Fit the Pipeline:**

+ Use the training dataset to fit the created pipeline.

In [0]:
## Fit the pipeline using the training dataset
pipeline_model = pipeline.fit(train_df)

##Task 5: Show Transformation Results
**5.1. Transform Datasets:**

+ Apply the fitted pipeline to transform the training and testing datasets.
+ Apply these transformations to different sets (e.g., train, test, validation).

In [0]:
## Transform both the training and test datasets using the previously fitted pipeline model
train_transformed_df = pipeline_model.transform(train_df)
test_transformed_df = pipeline_model.transform(test_df)

## Display the transformed features from the training dataset
display(train_transformed_df.select("features"))

##Task 6: Save Pipeline
**6.1. Save Pipeline:**

+ Save the fitted pipeline to the working directory.
+ Explore the saved pipeline.

In [0]:
## Save the trained pipeline model to the specified directory in the working directory
pipeline_model.write().overwrite().save(f"{DA.paths.working_dir}/spark_pipelines")

In [0]:
## Load the previously saved pipeline model from the specified directory in the working directory
from pyspark.ml import PipelineModel

## Load the pipeline model
loaded_pipeline = PipelineModel.load(f"{DA.paths.working_dir}/spark_pipelines")

## Display the stages of the loaded pipeline
loaded_pipeline.stages

## Conclusion

In conclusion, this lab demonstrated the crucial steps in preparing and transforming a dataset for machine learning. We covered data cleaning, splitting, and created a pipeline for tasks like imputation and scaling. Saving the pipeline ensures reproducibility, and these foundational concepts can be applied in various machine learning workflows.

&copy; 2025 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="_blank">Apache Software Foundation</a>.<br/><br/><a href="https://databricks.com/privacy-policy" target="_blank">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use" target="_blank">Terms of Use</a> | <a href="https://help.databricks.com/" target="_blank">Support</a>