<a href="https://colab.research.google.com/github/hbisgin/BigDatav1/blob/main/Lecture13_MLlibIntro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StringIndexerExample").getOrCreate()

# One hot encoder example
First create a dummy dataframe

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Create Spark session
spark = SparkSession.builder \
    .appName("OneHotEncodingExample") \
    .getOrCreate()

# Sample data
data = [
    (0, "red", 1.0),
    (1, "blue", 2.0),
    (2, "green", 3.0),
    (3, "blue", 4.0),
    (4, "red", 5.0),
]
df = spark.createDataFrame(data, ["id", "color", "value"])
df.show()


+---+-----+-----+
| id|color|value|
+---+-----+-----+
|  0|  red|  1.0|
|  1| blue|  2.0|
|  2|green|  3.0|
|  3| blue|  4.0|
|  4|  red|  5.0|
+---+-----+-----+



#Build a pipeline step by step to tackle with the categorical variable, color

In [None]:


# 1. StringIndexer: convert categorical string column → numeric index column, we have seen this
indexer = StringIndexer(inputCol="color", outputCol="colorIndex", handleInvalid="keep")
model = indexer.fit(df)
df_indexed = model.transform(df)
df_indexed.show()

+---+-----+-----+----------+
| id|color|value|colorIndex|
+---+-----+-----+----------+
|  0|  red|  1.0|       1.0|
|  1| blue|  2.0|       0.0|
|  2|green|  3.0|       2.0|
|  3| blue|  4.0|       0.0|
|  4|  red|  5.0|       1.0|
+---+-----+-----+----------+



In [None]:
# 2. OneHotEncoder: convert index column → one-hot encoded vector
encoder = OneHotEncoder(
    inputCols=["colorIndex"],
    outputCols=["colorVec"],
    dropLast=True,          # default: drop the last category to avoid full collinearity
    handleInvalid="error"    # how to treat unseen/invalid categories error vs. keep
)
model = encoder.fit(df_indexed)
df_encoded = model.transform(df_indexed)
df_encoded.show()

+---+-----+-----+----------+-------------+
| id|color|value|colorIndex|     colorVec|
+---+-----+-----+----------+-------------+
|  0|  red|  1.0|       1.0|(3,[1],[1.0])|
|  1| blue|  2.0|       0.0|(3,[0],[1.0])|
|  2|green|  3.0|       2.0|(3,[2],[1.0])|
|  3| blue|  4.0|       0.0|(3,[0],[1.0])|
|  4|  red|  5.0|       1.0|(3,[1],[1.0])|
+---+-----+-----+----------+-------------+



Instead of storing every value in the vector, we only store:

-the length of the vector,

-the indices where non-zero elements appear,

-and the values at those positions.

This is called a sparse vector.

🔹 Example 1: simple one-hot vector

For [1, 0, 0]
We can write it as:

```(3, [0], [1.0])```

In [None]:
# 3. (Optional) Combine with numeric features via VectorAssembler
assembler = VectorAssembler(
    inputCols=["colorVec", "value"],
    outputCol="features"
)
assembled = assembler.transform(df_encoded)
assembled.show()

+---+-----+-----+----------+-------------+-----------------+
| id|color|value|colorIndex|     colorVec|         features|
+---+-----+-----+----------+-------------+-----------------+
|  0|  red|  1.0|       1.0|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|
|  1| blue|  2.0|       0.0|(3,[0],[1.0])|[1.0,0.0,0.0,2.0]|
|  2|green|  3.0|       2.0|(3,[2],[1.0])|[0.0,0.0,1.0,3.0]|
|  3| blue|  4.0|       0.0|(3,[0],[1.0])|[1.0,0.0,0.0,4.0]|
|  4|  red|  5.0|       1.0|(3,[1],[1.0])|[0.0,1.0,0.0,5.0]|
+---+-----+-----+----------+-------------+-----------------+



Let's do it at once with the help of Pipeline

In [None]:
# Build a pipeline
pipeline = Pipeline(stages=[indexer, encoder, assembler])

# Fit and transform
model = pipeline.fit(df)
df_transformed = model.transform(df)

df_transformed.select("id", "color", "colorIndex", "colorVec", "features").show(truncate=False)


# Stop Spark when done
# spark.stop()

+---+-----+----------+-------------+-----------------+
|id |color|colorIndex|colorVec     |features         |
+---+-----+----------+-------------+-----------------+
|0  |red  |1.0       |(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|
|1  |blue |0.0       |(3,[0],[1.0])|[1.0,0.0,0.0,2.0]|
|2  |green|2.0       |(3,[2],[1.0])|[0.0,0.0,1.0,3.0]|
|3  |blue |0.0       |(3,[0],[1.0])|[1.0,0.0,0.0,4.0]|
|4  |red  |1.0       |(3,[1],[1.0])|[0.0,1.0,0.0,5.0]|
+---+-----+----------+-------------+-----------------+



# VectorIndexer Example



      - Automatically detects categorical features.
      - Encodes them as category indices under the hood.
      - Works seamlessly with downstream ML models.

    

In [None]:
from pyspark.ml.feature import VectorIndexer
# Automatically identify categorical features
# and index them (features with <= 2 distinct values)
indexer = VectorIndexer(
    inputCol="features",
    outputCol="indexedFeatures",
    maxCategories=2  # Features with <=2 distinct values are treated as categorical
)

# Fit the indexer
indexer_model = indexer.fit(assembled)

# Transform the data
indexed = indexer_model.transform(assembled)

indexed.select("features", "indexedFeatures").show(truncate=False)

+-----------------+-----------------+
|features         |indexedFeatures  |
+-----------------+-----------------+
|[0.0,1.0,0.0,1.0]|[0.0,1.0,0.0,1.0]|
|[1.0,0.0,0.0,2.0]|[1.0,0.0,0.0,2.0]|
|[0.0,0.0,1.0,3.0]|[0.0,0.0,1.0,3.0]|
|[1.0,0.0,0.0,4.0]|[1.0,0.0,0.0,4.0]|
|[0.0,1.0,0.0,5.0]|[0.0,1.0,0.0,5.0]|
+-----------------+-----------------+



In [None]:
# "category1" and "category2" are low-cardinality categorical features
# "numFeature" is a continuous numeric feature
data = spark.createDataFrame([
    (0, "A", "X", 10.0),
    (1, "B", "X", 20.0),
    (0, "A", "Y", 15.0),
    (1, "B", "Y", 25.0),
    (0, "C", "X", 30.0),
    (1, "C", "Y", 35.0)
], ["label", "category1", "category2", "numFeature"])

# Convert string categories to numeric indices first
from pyspark.ml.feature import StringIndexer
cat1_indexer = StringIndexer(inputCol="category1", outputCol="category1Index")
cat2_indexer = StringIndexer(inputCol="category2", outputCol="category2Index")

# Assemble all features into a single vector
assembler = VectorAssembler(
    inputCols=["category1Index", "category2Index", "numFeature"],
    outputCol="features"
)



# VectorIndexer automatically detects categorical features
# Here, features with <= 3 distinct values will be treated as categorical
indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=3)

print("Categorical feature indices and their category counts:")
print(indexer_model.categoryMaps)

# Features at positions 0, 1, and 2 in the feature vector are categorical with two categories each — values 0.0 and 1.0 —
# and they’ve been internally mapped to indices 0 and 1.”

Categorical feature indices and their category counts:
{0: {0.0: 0, 1.0: 1}, 1: {0.0: 0, 1.0: 1}, 2: {0.0: 0, 1.0: 1}}


# R formula

In [None]:

data = [
    ("red", 1.0, 10.0, 0.0),
    ("blue", 2.0, 20.0, 1.0),
    ("green", 3.0, 30.0, 0.0),
    ("red", 4.0, 40.0, 1.0)
]

drf = spark.createDataFrame(data, ["color", "value1", "value2", "label"])
drf.show()


In [None]:
from pyspark.ml.feature import RFormula

formula = RFormula(
    formula="label ~ . + color:value1"
)
model = formula.fit(drf)
output = model.transform(drf)
output.show()

# Sample data

In [None]:
!wget https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/refs/heads/master/data/sample_libsvm_data.txt

shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
--2025-10-07 22:57:09--  https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/refs/heads/master/data/sample_libsvm_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 104736 (102K) [text/plain]
sample_libsvm_data.txt: No such file or directory

Cannot write to ‘sample_libsvm_data.txt’ (Success).


In [None]:
sdf = spark.read.format("libsvm").load("/content/sample_libsvm_data.txt")

In [None]:
sdf.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



Import the LogisticRegression class
Define (instantiate) the logistic regression estimator



In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    labelCol="label",       # column containing true labels
    featuresCol="features", # column containing feature vectors
    maxIter=10,             # number of optimization iterations
    regParam=0.3,           # regularization strength
    elasticNetParam=0.8     # ElasticNet mixing (0=L2, 1=L1)
)

In [None]:
lr.explainParams()

"aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)\nelasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)\nfamily: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)\nfeaturesCol: features column name. (default: features, current: features)\nfitIntercept: whether to fit an intercept term. (default: True)\nlabelCol: label column name. (default: label, current: label)\nlowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)\nlowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constra

In [None]:

# Fit the model to your data (train)
lr_model = lr.fit(sdf)

# Inspect model summary and coefficients
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)

# Make predictions
predictions = lr_model.transform(sdf)
predictions.select("label", "prediction", "probability").show(5, truncate=False)


Coefficients: (692,[272,300,323,350,351,378,379,405,406,407,428,433,434,435,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.520689871384125e-05,-8.115773146847006e-05,3.814692771846427e-05,0.0003776490540424338,0.0003405148366194403,0.0005514455157343107,0.0004085386116096912,0.0004197467332749452,0.0008119171358670031,0.000502770837266875,-2.3929260406600902e-05,0.0005745048020902297,0.0009037546426803677,7.818229700243899e-05,-2.1787551952911914e-05,-3.402165821789542e-05,0.0004966517360637633,0.0008190557828370372,-8.017982139522613e-05,-2.743169403783527e-05,0.0004810832226238988,0.0004840801762677878,-8.926472920009901e-06,-0.00034148812330427297,-8.950592574121382e-05,0.00048645469116892156,-8.478698005186097e-05,-0.00042347832158317705,-7.296535777631246e-05])
Intercept: -0.5991460286401438
+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+------------------------------------

# With Pipeline

In [None]:
from pyspark.ml import Pipeline


# Define model stage
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")

# Create a pipeline with just the model stage
pipeline = Pipeline(stages=[lr])

# Fit the pipeline to your prepared data
pipelineModel = pipeline.fit(sdf)

# Access the trained Logistic Regression model
lr_model = pipelineModel.stages[0]
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)

# Make predictions
predictions = pipelineModel.transform(sdf)
predictions.select("label", "prediction", "probability").show(5, truncate=False)

spark.stop()


Coefficients: (692,[95,96,97,98,99,100,101,102,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299,300,301,302,303,304,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,341,342,343,344,345,346,347,348,349,350,351,352,353,354,355,356,357,358,359,360,369,370,371,372,373,374,375,376,377,378,379,380,381,382,383,384,385,386,387,388,396,397,398,399,400,401,402,403,404,405,406,407,408,409,410,411,412,413,414,415,416,424,425,426,427,428,429,430,431,432,433,434,435,436,437,438,439,440,441,442,443,444,452,453,454,455,4

# Parameter grid search example for logistic regression case

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, TrainValidationSplitModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

# -------------------------------------------------------------
# 1. Start Spark session
# -------------------------------------------------------------
spark = SparkSession.builder.appName("PipelineExample").getOrCreate()

# Load the sample dataset (comes with Spark)
data = spark.read.format("libsvm").load("/content/sample_libsvm_data.txt")

# Split into training and test sets
train, test = data.randomSplit([0.7, 0.3], seed=42)



In [None]:
# -------------------------------------------------------------
# 2. Define the base stages
# -------------------------------------------------------------
# Rename the existing 'features' column to avoid conflict with RFormula's output 'features' column
rename_features = data.withColumnRenamed("features", "originalFeatures")

# Update train and test dataframes with the renamed column
train = train.withColumnRenamed("features", "originalFeatures")
test = test.withColumnRenamed("features", "originalFeatures")


# RFormula will now output 'features' without conflict
# Update the formula to use the renamed column
rForm = RFormula(formula="label ~ originalFeatures")
lr = LogisticRegression(labelCol="label", featuresCol="features") # Logistic Regression expects 'features' column

# Create the pipeline
pipeline = Pipeline(stages=[rForm, lr])



In [None]:
# -------------------------------------------------------------
# 3. Define the parameter grid for hyperparameter tuning
# -------------------------------------------------------------
paramGrid = (
    ParamGridBuilder()
    .addGrid(rForm.formula, [
        "label ~ originalFeatures",
        "label ~ originalFeatures + originalFeatures:originalFeatures"  # synthetic variation example
    ])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # L2, mixed, L1
    .addGrid(lr.regParam, [0.1, 2.0])
    .build()
)



In [None]:
# -------------------------------------------------------------
# 4. Define evaluator
# -------------------------------------------------------------
evaluator = (
    BinaryClassificationEvaluator()
    .setMetricName("areaUnderROC")
    .setRawPredictionCol("prediction")
    .setLabelCol("label")
)



In [None]:
# -------------------------------------------------------------
# 5. Define TrainValidationSplit for model selection
# -------------------------------------------------------------
tvs = (
    TrainValidationSplit()
    .setEstimator(pipeline)
    .setEvaluator(evaluator)
    .setEstimatorParamMaps(paramGrid)
    .setTrainRatio(0.75)  # 75% training, 25% validation
)



In [None]:
# -------------------------------------------------------------
# 6. Fit the pipeline with hyperparameter tuning
# -------------------------------------------------------------
tvsFitted = tvs.fit(train)

# Evaluate the best model on the test set
auc = evaluator.evaluate(tvsFitted.transform(test))
print(f"Test AUC: {auc:.4f}")

Test AUC: 0.9424


In [None]:
# -------------------------------------------------------------
# 7. Inspect the best model and its training summary
# -------------------------------------------------------------
bestPipeline = tvsFitted.bestModel
# The last stage might be LogisticRegression, but depending on the formula,
# the RFormula might be the last stage if it doesn't include the label.
# In this case, the LogisticRegression is the last stage as the formula includes label.
bestLR = bestPipeline.stages[-1]
print("Best model parameters:")
print("  ElasticNetParam:", bestLR.getElasticNetParam())
print("  RegParam:", bestLR.getRegParam())
# Coefficients and intercept are not directly available on the bestLR model
# obtained from TrainValidationSplitModel. We need to access the fitted LR model
# within the bestPipeline's stages.

# Correctly access the fitted Logistic Regression model from the best pipeline
fittedLrModel = bestPipeline.stages[-1] # Assuming LR is the last stage

# Check if the fitted model has coefficients and intercept
if hasattr(fittedLrModel, "coefficients"):
    print("  Coefficients:", fittedLrModel.coefficients)
if hasattr(fittedLrModel, "intercept"):
    print("  Intercept:", fittedLrModel.intercept)

# Training summary (if available) - access from the fitted LR model
if hasattr(fittedLrModel, "summary"):
    print("Objective history:", fittedLrModel.summary.objectiveHistory)

Best model parameters:
  ElasticNetParam: 0.0
  RegParam: 0.1
  Coefficients: (692,[95,96,97,98,99,100,101,121,122,123,124,125,126,127,128,129,130,131,132,133,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299,300,301,302,303,304,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,341,342,343,344,345,346,347,348,349,350,351,352,353,354,355,356,357,358,359,360,369,370,371,372,373,374,375,376,377,378,379,380,381,382,383,384,385,386,387,388,396,397,398,399,400,401,402,403,404,405,406,407,408,409,410,411,412,413,414,415,416,424,425,426,427,428,429,430,431,432,433,434,435,436,437,438,439,440,441,442,443,4

In [None]:
# -------------------------------------------------------------
# 8. Save and reload the fitted model
# -------------------------------------------------------------
save_path = "/tmp/mymodel"
tvsFitted.write().overwrite().save(save_path)



In [None]:
# Load model for later use
loadedModel = TrainValidationSplitModel.load(save_path)

# Reload the data and recreate the test DataFrame in the current Spark session
# Assuming the data file is still available at the original path and the Spark session is active
# You might need to start a new Spark session if it was stopped before this cell
spark = SparkSession.builder.appName("ReloadData").getOrCreate() # Uncommented to start a new Spark session

data_reloaded = spark.read.format("libsvm").load("/content/sample_libsvm_data.txt")

# Reapply the same renaming and splitting logic as before to get the test set
# Ensure the seed is the same for reproducibility
data_reloaded = data_reloaded.withColumnRenamed("features", "originalFeatures")
_, test_reloaded = data_reloaded.randomSplit([0.7, 0.3], seed=42)


# Run prediction using the loaded model on the reloaded test data
loadedModel.transform(test_reloaded).select("label", "prediction", "probability").show(5, truncate=False)