<a href="https://colab.research.google.com/github/SUTHARSHANARAM/SUTHARSHANARAM/blob/main/Spark%20program%20to%20perform%20classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:

from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import Row

# Initialize Spark session
spark = SparkSession.builder.appName("ClassificationExample").getOrCreate()

# Sample data (replace or expand as needed)
data = [
    Row(feature1=1.0, feature2=2.0, label=0),
    Row(feature1=2.0, feature2=1.5, label=0),
    Row(feature1=3.0, feature2=3.5, label=1),
    Row(feature1=4.0, feature2=5.0, label=1),
    Row(feature1=5.0, feature2=3.0, label=1),
    Row(feature1=6.0, feature2=6.0, label=1),
    Row(feature1=7.0, feature2=8.0, label=1),
    Row(feature1=0.5, feature2=1.0, label=0),
    Row(feature1=1.5, feature2=2.5, label=0)
]

# Create DataFrame
df = spark.createDataFrame(data)
df.show()

# Step 1: Assemble features into a single vector column
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
df = assembler.transform(df)

# Step 2: Prepare the data for training
# Select only the 'features' and 'label' columns
df = df.select("features", "label")

# Step 3: Split the data into training and testing sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)

# Check if test data is empty
if test_data.count() == 0:
    print("Test data is empty. Adjusting split...")
    train_data, test_data = df.randomSplit([0.9, 0.1], seed=123)

# Step 4: Define and fit the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# Step 5: Make predictions on the test set
test_predictions = lr_model.transform(test_data)
test_predictions.select("features", "label", "prediction", "probability").show()

# Step 6: Evaluate the model
# Print the coefficients and intercept of the model
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)

# Step 7: Model evaluation using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(test_predictions)
print("Accuracy:", accuracy)

# Stop Spark session
spark.stop()



+--------+--------+-----+
|feature1|feature2|label|
+--------+--------+-----+
|     1.0|     2.0|    0|
|     2.0|     1.5|    0|
|     3.0|     3.5|    1|
|     4.0|     5.0|    1|
|     5.0|     3.0|    1|
|     6.0|     6.0|    1|
|     7.0|     8.0|    1|
|     0.5|     1.0|    0|
|     1.5|     2.5|    0|
+--------+--------+-----+

+---------+-----+----------+--------------------+
| features|label|prediction|         probability|
+---------+-----+----------+--------------------+
|[3.0,3.5]|    1|       0.0|[0.54281284112144...|
|[5.0,3.0]|    1|       1.0|[7.52656390841942...|
+---------+-----+----------+--------------------+

Coefficients: [6.585983453790336,7.011767177400992]
Intercept: -44.470807222059115
Accuracy: 0.5
