# Advanced Machine Learning using Spark MLlib

In [11]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [15]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Create a simple dataset
data = [
    (0, 1.0, 0.1),
    (1, 2.0, 1.1),
    (2, 3.0, 1.5),
    (3, 4.0, 1.7),
    (4, 5.0, 1.8),
]
columns = ['ID', 'Feature1', 'Feature2']

# Create a DataFrame
df = spark.createDataFrame(data, columns)

# Combine features into a single vector column (required for MLlib)
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='features')
df = assembler.transform(df)

# Define the target variable (label)
# For simplicity, let's use a rule to generate labels: 1 if Feature1 > 2, otherwise 0
df = df.withColumn("Label", (col("Feature1") > 2).cast("int"))

# Split the data into training and testing sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Create the logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='Label')

# Train the model on the training data
model = lr.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='Label')
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.2f}")

Test Accuracy: 1.00


In [14]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Step 1: Initialize Spark session
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()

# Step 2: Create a simple dataset (features: Feature1 and Feature2)
data = [
    (1, 1.0, 1.1),
    (2, 1.5, 1.6),
    (3, 3.0, 3.1),
    (4, 3.5, 3.6),
    (5, 8.0, 8.1),
    (6, 8.5, 8.6),
]
columns = ['ID', 'Feature1', 'Feature2']

# Create a DataFrame
df = spark.createDataFrame(data, columns)

# Combine features into a single vector column (required for MLlib)
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='features')
df = assembler.transform(df)

# Create the KMeans model
kmeans = KMeans().setK(2).setSeed(1)  # K=2 means we want 2 clusters

# Train the KMeans model
model = kmeans.fit(df)

# Make predictions (assign data points to clusters)
predictions = model.transform(df)

# Evaluate the model (Within Set Sum of Squared Errors)
wssse = model.summary.trainingCost
print(f"Within Set Sum of Squared Errors (WSSSE): {wssse}")




Within Set Sum of Squared Errors (WSSSE): 8.75


In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Step 1: Start a Spark session
spark = SparkSession.builder.appName("LinearRegressionCV").getOrCreate()

# Step 2: Load the dataset
data_path = "car_incident_canada (1).csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Step 3: Data preprocessing
# Identify feature columns and label column
feature_columns = [col for col in df.columns if col != "target"]  # Replace "target" with the actual label column name
label_column = "C_SEV"  # Replace "target" with the actual label column name

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df).select("features", label_column)

# Step 4: Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 5: Define the model
lr = LinearRegression(featuresCol="features", labelCol=label_column)

# Step 6: Hyperparameter tuning
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 0.5])  # Regularization parameter
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # ElasticNet mixing parameter
    .build()
)

# Define cross-validator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(labelCol=label_column, metricName="rmse"),
    numFolds=10
)

# Step 7: Train the model with cross-validation
cv_model = cv.fit(train_data)

# Step 8: Evaluate the model on the test set
test_predictions = cv_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol=label_column, metricName="rmse")
rmse = evaluator.evaluate(test_predictions)

print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

# Get the best model and print its parameters
best_model = cv_model.bestModel
print("Best Model Parameters:")
print(f"  regParam: {best_model._java_obj.getRegParam()}")
print(f"  elasticNetParam: {best_model._java_obj.getElasticNetParam()}")


Root Mean Squared Error (RMSE) on test data: 0.009903192314947344
Best Model Parameters:
  regParam: 0.01
  elasticNetParam: 0.5
