### NAME : Harshit Nigam
### Roll No.:  2311AI52
### Subject : Big DATA ANALYTICS
### Assignment 9

# Classification On Iris Dataset

In [None]:
!pip install pyspark




## Classification on Iris Dataset Using LogisticRegression

In [None]:
# Step 1: Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Step 3: Create a SparkSession
spark = SparkSession.builder \
    .appName("Iris Classification") \
    .getOrCreate()

# Step : Load the Iris dataset
iris_df = spark.read.csv("/content/Iris.csv", header=True, inferSchema=True)

# Step 5: Prepare the data and encode the label column
label_indexer = StringIndexer(inputCol="Species", outputCol="label")
iris_df_indexed = label_indexer.fit(iris_df).transform(iris_df)

# Assemble features
assembler = VectorAssembler(
    inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
    outputCol="features"
)
iris_df_final = assembler.transform(iris_df_indexed)

# Step 6: Split the data
train_data, test_data = iris_df_final.randomSplit([0.7, 0.3], seed=42)

# Step 7: Train the Multinomial Logistic Regression model
lr_classifier = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# Step 8: Create ParamGridBuilder
param_grid = ParamGridBuilder() \
    .addGrid(lr_classifier.maxIter, [10, 20, 30]) \
    .addGrid(lr_classifier.regParam, [0.1, 0.01, 0.001]) \
    .build()

# Step 9: Create CrossValidator
cross_val = CrossValidator(estimator=lr_classifier,
                           estimatorParamMaps=param_grid,
                           evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                           numFolds=3)

# Step 10: Fit CrossValidator
cv_model = cross_val.fit(train_data)

# Step 11: Make predictions
predictions = cv_model.transform(test_data)

# Step 12: Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Step 13: Best Model
best_model = cv_model.bestModel
print("Best model parameters:")
print("Max Iterations:", best_model.getMaxIter())
print("Regularization Parameter:", best_model.getRegParam())


Accuracy: 0.9347826086956522
Best model parameters:
Max Iterations: 10
Regularization Parameter: 0.001


## Classification on Iris Dataset Using Decision Tree Classifier

In [None]:
# Step 1: Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Step 3: Create a SparkSession
spark = SparkSession.builder \
    .appName("Iris Classification") \
    .getOrCreate()

# Step : Load the Iris dataset
iris_df = spark.read.csv("/content/Iris.csv", header=True, inferSchema=True)

# Step 5: Prepare the data and encode the label column
label_indexer = StringIndexer(inputCol="Species", outputCol="label")
iris_df_indexed = label_indexer.fit(iris_df).transform(iris_df)

# Assemble features
assembler = VectorAssembler(
    inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
    outputCol="features"
)
iris_df_final = assembler.transform(iris_df_indexed)

# Step 6: Split the data
train_data, test_data = iris_df_final.randomSplit([0.7, 0.3], seed=42)

# Step 7: Train the Decision Tree classifier
dt_classifier = DecisionTreeClassifier(labelCol="label", featuresCol="features")
# Step 8: Create ParamGridBuilder
param_grid = ParamGridBuilder() \
    .addGrid(dt_classifier.maxDepth, [3, 5, 7]) \
    .build()

# Step 9: Create CrossValidator
cross_val = CrossValidator(estimator=dt_classifier,
                            estimatorParamMaps=param_grid,
                            evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                            numFolds=3)

# Step 10: Fit CrossValidator
cv_model = cross_val.fit(train_data)

# Step 11: Make predictions
predictions = cv_model.transform(test_data)

# Step 12: Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Step 13: Best Model
best_model = cv_model.bestModel
print("Best model parameters:")
print("Max Depth:", best_model.getOrDefault("maxDepth"))



Accuracy: 0.9347826086956522
Best model parameters:
Max Depth: 3


## Classification on Iris Dataset Using RandomForestClassifier

In [None]:
# Step 1: Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# # Step 3: Create a SparkSession
# spark = SparkSession.builder \
#     .appName("Iris Classification") \
#     .getOrCreate()

# Step : Load the Iris dataset
iris_df = spark.read.csv("/content/Iris.csv", header=True, inferSchema=True)

# Step 5: Prepare the data and encode the label column
label_indexer = StringIndexer(inputCol="Species", outputCol="label")
iris_df_indexed = label_indexer.fit(iris_df).transform(iris_df)

# Assemble features
assembler = VectorAssembler(
    inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
    outputCol="features"
)
iris_df_final = assembler.transform(iris_df_indexed)

# Step 6: Split the data
train_data, test_data = iris_df_final.randomSplit([0.7, 0.3], seed=42)

# Step 7: Train the RandomForestClassifier
rf_classifier = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=50)
# Step 8: Create ParamGridBuilder
param_grid = ParamGridBuilder() \
    .addGrid(rf_classifier.numTrees, [10, 20, 30]) \
    .addGrid(rf_classifier.maxDepth, [3, 5, 7]) \
    .build()

# Step 9: Create CrossValidator
cross_val = CrossValidator(estimator=rf_classifier,
                            estimatorParamMaps=param_grid,
                            evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                            numFolds=3)

# Step 10: Fit CrossValidator
cv_model = cross_val.fit(train_data)

# Step 11: Make predictions
predictions = cv_model.transform(test_data)

# Step 12: Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Step 13: Best Model
best_model = cv_model.bestModel
print("Best model parameters:")
print("Num Trees:", best_model.getNumTrees)
print("Max Depth:", best_model.getOrDefault("maxDepth"))

Accuracy: 0.9130434782608695
Best model parameters:
Num Trees: 10
Max Depth: 5


## Classification on Iris Dataset Using Gradient Boosting Classifier(GBTClassifier)

In [None]:
## Gradient Boosting
# Step 1: Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# # Step 3: Create a SparkSession
# spark = SparkSession.builder \
#     .appName("Iris Classification") \
#     .getOrCreate()

# Step : Load the Iris dataset
iris_df = spark.read.csv("/content/Iris.csv", header=True, inferSchema=True)
iris_df = iris_df.filter(iris_df["Species"]!="Iris-virginica")
# Step 5: Prepare the data and encode the label column
label_indexer = StringIndexer(inputCol="Species", outputCol="label")
iris_df_indexed = label_indexer.fit(iris_df).transform(iris_df)

# Assemble features
assembler = VectorAssembler(
    inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
    outputCol="features"
)
iris_df_final = assembler.transform(iris_df_indexed)

# Step 6: Split the data
train_data, test_data = iris_df_final.randomSplit([0.7, 0.3], seed=42)

# Step 7: Create a Gradient Boosted Trees classifier
gbt_classifier = GBTClassifier(labelCol="label", featuresCol="features")

# Step 8: Define a parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(gbt_classifier.maxDepth, [3, 5, 7]) \
    .addGrid(gbt_classifier.maxIter, [10, 20, 30]) \
    .build()

# Step 9: Perform cross-validation
cross_val = CrossValidator(estimator=gbt_classifier,
                           estimatorParamMaps=param_grid,
                           evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                           numFolds=3)

# Step 10: Fit the cross-validator to the training data
cv_model = cross_val.fit(train_data)

# Step 11: Make predictions on the test data
predictions = cv_model.transform(test_data)

# Step 12: Evaluate the model's performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Step 13: Print the best model parameters
best_model = cv_model.bestModel
print("Best model parameters:")
print("Max Depth:", best_model.getMaxDepth())
print("Max Iterations:", best_model.getMaxIter())



Accuracy: 1.0
Best model parameters:
Max Depth: 3
Max Iterations: 10


# Regression Problem On California Housing DataSet

## Regression Model on California Housing DataSet Using Linear Regression

In [None]:
#Using Linear Regression Model

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Linear Regression with PySpark") \
    .getOrCreate()

# Load California housing dataset
data = spark.read.csv("/content/housing.csv", header=True, inferSchema=True)


# Drop rows with null values
data = data.dropna()

# Perform string indexing for the 'ocean_proximity' column
string_indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
data = string_indexer.fit(data).transform(data)

# Perform one-hot encoding for the indexed column
encoder = OneHotEncoder(inputCol="ocean_proximity_index", outputCol="ocean_proximity_encoded")
data = encoder.fit(data).transform(data)

# Assemble features
feature_columns = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                   'total_bedrooms', 'population', 'households', 'median_income', 'ocean_proximity_encoded']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Rename target column to 'label'
data = data.withColumnRenamed('median_house_value', 'label')

# Split the data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Define the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="label")

# Define grid parameters for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(metricName="rmse")

# Define evaluator for R2
evaluator_r2 = RegressionEvaluator(metricName="r2")

# Define cross-validator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

# Train the model
cv_model = crossval.fit(train_data)

# Make predictions on the test set
predictions = cv_model.transform(test_data)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

# Evaluate the model - R2
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data:", r2)

# Best model
best_model = cv_model.bestModel
print("Best model parameters:")
print("RegParam:", best_model._java_obj.getRegParam())
print("ElasticNetParam:", best_model._java_obj.getElasticNetParam())




Root Mean Squared Error (RMSE) on test data: 68641.31132528615
R-squared (R2) on test data: 0.6432667986710991
Best model parameters:
RegParam: 0.5
ElasticNetParam: 0.0


## Regression Model on California Housing DataSet Using Decision Tree Regressor

In [None]:
# Using Decision Tree Regressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Decision Tree Regression with PySpark") \
    .getOrCreate()

# Load California housing dataset
data = spark.read.csv("/content/housing.csv", header=True, inferSchema=True)



# Drop rows with null values
data = data.dropna()

# Perform string indexing for the 'ocean_proximity' column
string_indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
data = string_indexer.fit(data).transform(data)

# Perform one-hot encoding for the indexed column
encoder = OneHotEncoder(inputCol="ocean_proximity_index", outputCol="ocean_proximity_encoded")
data = encoder.fit(data).transform(data)

# Assemble features
feature_columns = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                   'total_bedrooms', 'population', 'households', 'median_income', 'ocean_proximity_encoded']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Rename target column to 'label'
data = data.withColumnRenamed('median_house_value', 'label')

# Split the data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Define the decision tree regression model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")

# Define grid parameters for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 7]) \
    .addGrid(dt.minInstancesPerNode, [1, 3, 5]) \
    .build()

# Define evaluator for RMSE
evaluator_rmse = RegressionEvaluator(metricName="rmse")

# Define evaluator for R2
evaluator_r2 = RegressionEvaluator(metricName="r2")

# Define cross-validator
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator_rmse,  # Use RMSE for model selection during cross-validation
                          numFolds=5)

# Train the model
cv_model = crossval.fit(train_data)

# Make predictions on the test set
predictions = cv_model.transform(test_data)

# Evaluate the model - RMSE
rmse = evaluator_rmse.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

# Evaluate the model - R2
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data:", r2)

# Best model
best_model = cv_model.bestModel
print("Best model parameters:")
print("MaxDepth:", best_model._java_obj.getMaxDepth())
print("MinInstancesPerNode:", best_model._java_obj.getMinInstancesPerNode())


Root Mean Squared Error (RMSE) on test data: 66070.57962413561
R-squared (R2) on test data: 0.6694869427206733
Best model parameters:
MaxDepth: 7
MinInstancesPerNode: 5


## Regression Model on California Housing DataSet Using Gradient Boosting Regressor(GBTRegressor)

In [None]:
# By using Gradient Boosting Regressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Gradient Boosting Regression with PySpark") \
    .getOrCreate()

# Load California housing dataset
data = spark.read.csv("/content/housing.csv", header=True, inferSchema=True)


# Drop rows with null values
data = data.dropna()

# Perform string indexing for the 'ocean_proximity' column
string_indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
data = string_indexer.fit(data).transform(data)

# Perform one-hot encoding for the indexed column
encoder = OneHotEncoder(inputCol="ocean_proximity_index", outputCol="ocean_proximity_encoded")
data = encoder.fit(data).transform(data)

# Assemble features
feature_columns = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                   'total_bedrooms', 'population', 'households', 'median_income', 'ocean_proximity_encoded']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Rename target column to 'label'
data = data.withColumnRenamed('median_house_value', 'label')

# Split the data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Define the Gradient Boosting Regression model
gbt = GBTRegressor(featuresCol="features", labelCol="label")

# Define grid parameters for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5, 7]) \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .build()

# Define evaluator for RMSE
evaluator_rmse = RegressionEvaluator(metricName="rmse")

# Define evaluator for R2
evaluator_r2 = RegressionEvaluator(metricName="r2")

# Define cross-validator
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator_rmse,  # Use RMSE for model selection during cross-validation
                          numFolds=5)

# Train the model
cv_model = crossval.fit(train_data)

# Make predictions on the test set
predictions = cv_model.transform(test_data)

# Evaluate the model - RMSE
rmse = evaluator_rmse.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

# Evaluate the model - R2
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data:", r2)

# Best model
best_model = cv_model.bestModel
print("Best model parameters:")
print("MaxDepth:", best_model._java_obj.getMaxDepth())
print("MaxIter:", best_model._java_obj.getMaxIter())



Root Mean Squared Error (RMSE) on test data: 56125.89882890557
R-squared (R2) on test data: 0.7614941663434059
Best model parameters:
MaxDepth: 7
MaxIter: 30


## Regression Model on California Housing DataSet Using RandomForestRegressor

In [None]:
# By Using RandomForestRegressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Random Forest Regression with PySpark") \
    .getOrCreate()

# Load California housing dataset
data = spark.read.csv("/content/housing.csv", header=True, inferSchema=True)

# # Check for null values in the dataset
# null_counts = [(col_name, data.where(col(col_name).isNull()).count()) for col_name in data.columns]
# print("Null value counts:")
# for col_name, count in null_counts:
#     print(col_name + ": " + str(count))

# Drop rows with null values
data = data.dropna()

# Perform string indexing for the 'ocean_proximity' column
string_indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
data = string_indexer.fit(data).transform(data)

# Perform one-hot encoding for the indexed column
encoder = OneHotEncoder(inputCol="ocean_proximity_index", outputCol="ocean_proximity_encoded")
data = encoder.fit(data).transform(data)

# Assemble features
feature_columns = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                   'total_bedrooms', 'population', 'households', 'median_income', 'ocean_proximity_encoded']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Rename target column to 'label'
data = data.withColumnRenamed('median_house_value', 'label')

# Split the data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Define the Random Forest Regression model
rf = RandomForestRegressor(featuresCol="features", labelCol="label")

# Define grid parameters for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [3, 5, 7]) \
    .build()

# Define evaluator for RMSE
evaluator_rmse = RegressionEvaluator(metricName="rmse")

# Define evaluator for R2
evaluator_r2 = RegressionEvaluator(metricName="r2")

# Define cross-validator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator_rmse,  # Use RMSE for model selection during cross-validation
                          numFolds=5)

# Train the model
cv_model = crossval.fit(train_data)

# Make predictions on the test set
predictions = cv_model.transform(test_data)

# Evaluate the model - RMSE
rmse = evaluator_rmse.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

# Evaluate the model - R2
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data:", r2)

# Best model
best_model = cv_model.bestModel
print("Best model parameters:")
print("NumTrees:", best_model._java_obj.getNumTrees())
print("MaxDepth:", best_model._java_obj.getMaxDepth())

# Stop SparkSession
spark.stop()


Root Mean Squared Error (RMSE) on test data: 62810.00160021876
R-squared (R2) on test data: 0.7013035942163848
Best model parameters:
NumTrees: 30
MaxDepth: 7
