In [None]:
# Install dependencies if necessary
# !pip install pyspark

In [14]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [24]:
# Step 1: Start a Spark session
spark = SparkSession.builder.appName("SparkML_Example").getOrCreate()

In [None]:
import urllib.request

# Step 2a: Download the dataset locally
data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
local_path = "/tmp/iris.data"
urllib.request.urlretrieve(data_url, local_path)

# Step 2b: Load the data from the local path
data = spark.read.csv(local_path, inferSchema=True, header=False)

# Rename columns as before
data = data.withColumnRenamed("_c4", "label") \
           .withColumnRenamed("_c0", "sepal_length") \
           .withColumnRenamed("_c1", "sepal_width") \
           .withColumnRenamed("_c2", "petal_length") \
           .withColumnRenamed("_c3", "petal_width")

data.show(5)


  

*   StringIndexer converts the target label to a numeric format suitable for ML algorithms.
*   VectorAssembler combines features into a single vector column.



In [None]:

# Step 3: Preprocess the data
# Convert categorical labels into indexed numerical format
label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
# Combine feature columns into a single vector column
feature_assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
                                    outputCol="features")
# Print the label -> index
# Apply the StringIndexer to transform the data
indexed_data = label_indexer.fit(data).transform(data)

# Show distinct label and index pairs
indexed_data.select("label", "indexedLabel").distinct().show()



*   	LogisticRegression is the classifier in this example.




In [18]:
# Step 4: Define the classifier
lr = LogisticRegression(featuresCol="features", labelCol="indexedLabel", maxIter=10)


In [19]:
# Step 5: Build the pipeline
pipeline = Pipeline(stages=[label_indexer, feature_assembler, lr])



*   Pipeline chains the steps in sequence for an efficient workflow.



In [8]:
# Step 6: Train the model
model = pipeline.fit(data)

In [20]:
# Step 7: Make predictions
predictions = model.transform(data)

MulticlassClassificationEvaluator evaluates model performance using accuracy as a metric.

In [21]:
# Step 8: Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
# Display the results
print(f"Model Accuracy: {accuracy:.2f}")

Model Accuracy: 0.99


In [None]:
# Step 9: Show sample predictions
predictions.select("features", "indexedLabel", "prediction").show(12)

# Stop the Spark session
spark.stop()