In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

In [None]:
# Get the SparkSession
spark = SparkSession.builder.appName("IrisTraining").getOrCreate()

# Define the path to the feature-engineered data
feature_engineered_data_path = "/mnt/my_delta_lake/iris_feature_engineered" # Ensure this matches the feature engineering path

# Read the feature-engineered data
feature_engineered_df = spark.read.format("delta").load(feature_engineered_data_path)

# Prepare data for MLlib
feature_columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "petal_area"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
indexer = StringIndexer(inputCol="species", outputCol="label")

# Split the data into training and testing sets
(training_data, test_data) = feature_engineered_df.randomSplit([0.8, 0.2], seed=42)

# Train a Logistic Regression model
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Create a pipeline
pipeline = Pipeline(stages=[assembler, indexer, lr])

# Fit the pipeline to the training data
model = pipeline.fit(training_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

# Define the path to save the trained model
model_path = "/mnt/my_delta_lake/iris_model" # Replace with your desired path

# Save the trained model
model.write().overwrite().save(model_path)

print(f"Trained model saved to: {model_path}")

spark.stop()