Generating Embedding for Categorical Features

In [0]:
%python
from pyspark.sql.functions import split, concat_ws, col
from pyspark.ml.feature import StringIndexer, VectorAssembler, Word2Vec
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql.types import ArrayType, StringType, IntegerType, DoubleType, FloatType
import pyspark.sql.functions as F

def generate_categorical_embeddings(df, categorical_cols, vector_size=5):
    """
    Generates embeddings for categorical columns using Word2Vec.

    Parameters:
    - df (DataFrame): Input Spark DataFrame.
    - categorical_cols (list): List of categorical column names.
    - vector_size (int): Size of the embedding vectors.

    Returns:
    - DataFrame with embeddings as a single vector columns.
    """

    # Replace NULL categorical values with "unknown"
    for col_name in categorical_cols:
        df = df.withColumn(col_name, F.when(F.col(col_name).isNull(), 'unknown').otherwise(F.col(col_name)))

    # Combine all categorical columns into a single column for Word2Vec
    df = df.withColumn('categorical_seq', concat_ws(',', *categorical_cols))

    # Tokenize categorical data
    df = df.withColumn('categorical_tokens', split(col('categorical_seq'), ' '))

    # Train Word2Vec model
    word2vec = Word2Vec(vectorSize=vector_size, minCount=0, inputCol='categorical_tokens', outputCol='embeddings_struct')
    model = word2vec.fit(df)
    df = model.transform(df)

    return df

Applying Embedding to a Categorical Feature

In [0]:
# Read the train_df from the location '/dbfs/user/arundhuti/delta/customer_churn/silver/train_data/'
train_df = spark.read.format("delta").load("dbfs:/user/arundhuti/delta/customer_churn/silver/train_data/")
test_df = spark.read.format("delta").load("dbfs:/user/arundhuti/delta/customer_churn/silver/test_data/")

In [0]:
# Define Categorical columns
categorical_cols = ['gender', 'Partner', 'InternetService', 'Contract', 'PaperlessBilling', 'PaymentMethod','churn'] 

# Generate embedding for categorical columns
train_df = generate_categorical_embeddings(train_df, categorical_cols)
test_df = generate_categorical_embeddings(test_df, categorical_cols)

#display(train_df)

Converting Embedding into Dense Vectors

In [0]:
# Extract the 'values' field from the Word2Vec struct
DenseVector_udf =  F.udf(lambda v: DenseVector(v.values) if v else DenseVector([0.0]*5), VectorUDT())

# Convert embeddings into DenseVectors
for col_name in categorical_cols:
    train_df = train_df.withColumn(col_name + '_embeddings', DenseVector_udf(F.col('embeddings_struct')))
    test_df = test_df.withColumn(col_name + '_embeddings', DenseVector_udf(F.col('embeddings_struct')))

# Drop unnecerrary columns after embeddings are extracted    
train_df = train_df.drop('categorical_seq', 'categorical_tokens', 'embeddings_struct')
test_df = test_df.drop('categorical_seq', 'categorical_tokens', 'embeddings_struct')

display(train_df)

In [0]:
# Show a sample of embedded categorical features
display(train_df.select('gender_embeddings', 'Partner_embeddings', 'InternetService_embeddings', 'Contract_embeddings', 'PaperlessBilling_embeddings', 'PaymentMethod_embeddings', 'churn_embeddings'))


Feature Engineering Pipeline

In [0]:
from pyspark.ml.feature import VectorAssembler, Imputer , StandardScaler
from pyspark.ml import Pipeline

# Define numerical columns for imputation
numerical_cols = ["SeniorCitizen", "tenure", "TotalCharges"]

# Impute missing numerical features
imputer = Imputer(inputCols=numerical_cols, outputCols=[col + "_imputed" for col in numerical_cols])

# Assemble numerical columns into a single vector
numerical_assembler = VectorAssembler(inputCols=[col + "_imputed" for col in numerical_cols], outputCol="numerical_assembled")

# Scale numerical features to standardize values
numerical_scaler = StandardScaler(inputCol="numerical_assembled", outputCol="numerical_scaled")

# Assemble all features (numerical+categorical embeddings) into a single vector
feature_cols = ["numerical_scaled"] + [col + "_embeddings" for col in categorical_cols]
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="all_features")

# Define the sequence of transformations
stage_list = [imputer, numerical_assembler, numerical_scaler, vector_assembler]

# Instantiate the pipeline
pipeline = Pipeline(stages=stage_list)

In [0]:
# Fit the Pipeline 
pipeline_model = pipeline.fit(train_df)

In [0]:
# Apply the Feature Engineering Pipeline using .transform()
train_transformed_df = pipeline_model.transform(train_df)
test_transformed_df = pipeline_model.transform(test_df)

In [0]:
# Show transformed features
train_transformed_df.select('all_features').show(3,truncate=False)

Save and Reuse the Pipeline

In [0]:
# Save the Pipeline model with overwrite mode
pipeline_model.write().overwrite().save("dbfs:/user/arundhuti/delta/customer_churn/silver/pipeline_model")
display(dbutils.fs.ls("dbfs:/user/arundhuti/delta/customer_churn/silver/pipeline_model"))

In [0]:
# Load and use the saved pipeline
from pyspark.ml import PipelineModel

loaded_pipeline_model = PipelineModel.load("dbfs:/user/arundhuti/delta/customer_churn/silver/pipeline_model")

# Show pipeline stages
loaded_pipeline_model.stages

In [0]:
# use the loaded pipeline to transform the test dataset
test_transformed_df = loaded_pipeline_model.transform(test_df)

display(test_transformed_df)

In [0]:
# Registered the ML model
from pyspark.ml.feature import Word2VecModel


spark.udf.register("get_embeddings", get_embeddings)

