<a href="https://colab.research.google.com/github/90485462/ISOM676/blob/main/Assignment_3(3).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [42]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.3/spark-3.2.3-bin-hadoop2.7.tgz
!tar xf spark-3.2.3-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop2.7"

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()



In [43]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Load the Dataset

In [44]:
file_path = '/content/drive/My Drive/spambase.data'  # Update this path

# If using pandas and want to convert to Spark DataFrame later
import pandas as pd
df_pd = pd.read_csv(file_path, header=None)

Initialize PySpark Session

In [45]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("Spam Detection").getOrCreate()

# Convert pandas DataFrame to Spark DataFrame
df = spark.createDataFrame(df_pd)

# Alternatively, load directly into Spark DataFrame if schema is known
# Example schema definition if needed
from pyspark.sql.types import StructType, StructField, FloatType

schema = StructType([
    *(StructField(f"feature_{i}", FloatType(), False) for i in range(57)),
    StructField("label", FloatType(), False)
])

df = spark.read.csv(file_path, schema=schema, header=False)

  for column, series in pdf.iteritems():


Preprocess the Data

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol="features")
df = assembler.transform(df)

# Split the data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

Train Models

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
rf_model = rf.fit(train_df)
rf_predictions = rf_model.transform(test_df)

In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)

In [None]:
from pyspark.ml.classification import LinearSVC

svm = LinearSVC(labelCol="label", featuresCol="features", maxIter=10)
svm_model = svm.fit(train_df)
svm_predictions = svm_model.transform(test_df)

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
lr_model = lr.fit(train_df)
lr_predictions = lr_model.transform(test_df)

Calculating Cost

In [81]:
# Average Cost
def calculate_average_cost(predictions, cost_fp=1, cost_fn=10):
    # Calculate the costs based on the condition: FP and FN
    predictions = predictions.withColumn('cost',
                                         when((predictions['label'] == 0) & (predictions['prediction'] == 1), cost_fp)
                                         .when((predictions['label'] == 1) & (predictions['prediction'] == 0), cost_fn)
                                         .otherwise(0))
    # Sum the total cost
    total_cost = predictions.groupBy().sum('cost').collect()[0][0]

    # Count the number of instances evaluated
    num_instances = predictions.count()

    # Calculate the average cost per instance
    average_cost = total_cost / num_instances if num_instances else 0
    return average_cost

In [56]:
rf_average_cost = calculate_average_cost(rf_predictions)
gbt_average_cost = calculate_average_cost(gbt_predictions)
svm_average_cost = calculate_average_cost(svm_predictions)
dt_average_cost = calculate_average_cost(dt_predictions)
lr_average_cost = calculate_average_cost(lr_predictions)

print(f"Random Forest Average Cost: {rf_average_cost}")
print(f"Gradient Boosted Trees Average Cost: {gbt_average_cost}")
print(f"SVM Average Cost: {svm_average_cost}")
print(f"Decision Tree Average Cost: {dt_average_cost}")
print(f"Logistic Regression Average Cost: {lr_average_cost}")

Random Forest Average Cost: 0.610730593607306
Gradient Boosted Trees Average Cost: 0.4954337899543379
SVM Average Cost: 0.4246575342465753
Decision Tree Average Cost: 0.680365296803653
Logistic Regression Average Cost: 0.4885844748858447


In [46]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Assuming df is your Spark DataFrame

# Feature Scaling
assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol="assembled_features")
scaler = StandardScaler(inputCol="assembled_features", outputCol="features", withStd=True, withMean=False)

# Logistic Regression
lr = LogisticRegression(labelCol="label", maxIter=10)

# Support Vector Machine
svm = LinearSVC(labelCol="label", maxIter=10)

# Pipeline for LR and SVM (you can run them separately)
pipeline_lr = Pipeline(stages=[assembler, scaler, lr])
pipeline_svm = Pipeline(stages=[assembler, scaler, svm])

# ParamGrid for hyperparameter tuning
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

paramGrid_svm = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Evaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")

# CrossValidator
cv_lr = CrossValidator(estimator=pipeline_lr,
                       estimatorParamMaps=paramGrid_lr,
                       evaluator=evaluator,
                       numFolds=5)

cv_svm = CrossValidator(estimator=pipeline_svm,
                        estimatorParamMaps=paramGrid_svm,
                        evaluator=evaluator,
                        numFolds=5)

# Fit models
cv_model_lr = cv_lr.fit(df)
cv_model_svm = cv_svm.fit(df)

# Best models
best_model_lr = cv_model_lr.bestModel
best_model_svm = cv_model_svm.bestModel

In [49]:
# Generate Prediction
best_predictions_lr = best_model_lr.stages[-1].transform(test_df)
best_predictions_svm = best_model_svm.stages[-1].transform(test_df)

In [55]:
b_lr_average_cost = calculate_average_cost(best_predictions_lr)
b_svm_average_cost = calculate_average_cost(best_predictions_svm)

print(f"Logistic Regression Average Cost: {b_lr_average_cost}")
print(f"SVM Cost: {b_svm_average_cost}")

Logistic Regression Average Cost: 0.5125570776255708
SVM Cost: 0.5057077625570776


Feature Selection

In [57]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# Assemble features into a vector
assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol="features")
vector_df = assembler.transform(df)

# Compute Pearson correlation matrix
correlation_matrix = Correlation.corr(vector_df, "features").head()

# Print the correlation matrix
print(str(correlation_matrix[0]))

DenseMatrix([[ 1.00000000e+00, -1.67594660e-02,  6.56267675e-02, ...,
               4.44908620e-02,  6.13824002e-02,  8.91647777e-02],
             [-1.67594660e-02,  1.00000000e+00, -3.35256801e-02, ...,
               2.08284235e-03,  2.71204540e-04, -2.26796971e-02],
             [ 6.56267675e-02, -3.35256801e-02,  1.00000000e+00, ...,
               9.73980424e-02,  1.07462734e-01,  7.01135859e-02],
             ...,
             [ 4.44908620e-02,  2.08284235e-03,  9.73980424e-02, ...,
               1.00000000e+00,  4.92638297e-01,  1.62313609e-01],
             [ 6.13824002e-02,  2.71204540e-04,  1.07462734e-01, ...,
               4.92638297e-01,  1.00000000e+00,  4.75485956e-01],
             [ 8.91647777e-02, -2.26796971e-02,  7.01135859e-02, ...,
               1.62313609e-01,  4.75485956e-01,  1.00000000e+00]])




In [71]:
correlation_matrix

Row(pearson(features)=DenseMatrix(57, 57, [1.0, -0.0168, 0.0656, 0.0133, 0.0231, 0.0597, 0.0077, -0.0039, ..., 0.1122, 0.006, 0.0363, 0.2019, 0.0426, 0.1623, 0.4755, 1.0], False))

In [92]:
from pyspark.ml.stat import Correlation
import numpy as np

# Assuming `correlation_matrix` is your DenseMatrix object inside a Row as shown
dense_matrix = correlation_matrix['pearson(features)']  # Extracting the DenseMatrix

# Convert DenseMatrix to a NumPy array
corr_matrix = np.array(dense_matrix.toArray())

In [93]:
# Define a threshold for high correlation
threshold = 0.9

# Assuming you have a list of feature names corresponding to the columns of the correlation matrix
feature_names = ["word_freq_make", "word_freq_address", "word_freq_all", "word_freq_3d", "word_freq_our",
    "word_freq_over", "word_freq_remove", "word_freq_internet", "word_freq_order", "word_freq_mail",
    "word_freq_receive", "word_freq_will", "word_freq_people", "word_freq_report", "word_freq_addresses",
    "word_freq_free", "word_freq_business", "word_freq_email", "word_freq_you", "word_freq_credit",
    "word_freq_your", "word_freq_font", "word_freq_000", "word_freq_money", "word_freq_hp",
    "word_freq_hpl", "word_freq_george", "word_freq_650", "word_freq_lab", "word_freq_labs",
    "word_freq_telnet", "word_freq_857", "word_freq_data", "word_freq_415", "word_freq_85",
    "word_freq_technology", "word_freq_1999", "word_freq_parts", "word_freq_pm", "word_freq_direct",
    "word_freq_cs", "word_freq_meeting", "word_freq_original", "word_freq_project", "word_freq_re",
    "word_freq_edu", "word_freq_table", "word_freq_conference", "char_freq_;", "char_freq_(",
    "char_freq_[", "char_freq_!", "char_freq_$", "char_freq_#", "capital_run_length_average",
    "capital_run_length_longest", "capital_run_length_total", "class_label"]  # Adjust this list to match your actual features

# Identify pairs of highly correlated features
highly_correlated_pairs = []

for i in range(len(corr_matrix)):
    for j in range(i+1, len(corr_matrix)):
        if abs(corr_matrix[i, j]) > threshold:
            highly_correlated_pairs.append((feature_names[i], feature_names[j]))

# Print the highly correlated pairs
for pair in highly_correlated_pairs:
    print(f"Highly correlated pair: {pair}")


Highly correlated pair: ('word_freq_857', 'word_freq_415')


In [100]:
# Assuming you decide to drop 'word_freq_415' and keep 'word_freq_857'
# Remove 'word_freq_415' from the feature list
selected_features = [feature for feature in df.columns if feature != 'word_freq_415']
df_transformed = assembler.transform(df)

In [101]:
# Define the input columns for the features
feature_columns = df_transformed.columns[:-1]

# Assemble the feature columns into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Optionally, split the data into training and test sets
(train_data, test_data) = df_transformed.randomSplit([0.8, 0.2], seed=41)

In [102]:
# Define the model
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=20)

# Train the model
model = gbt.fit(df_transformed)

In [103]:
predictions = model.transform(test_data)

In [104]:
average_cost = calculate_average_cost(predictions)
print(f"Average Cost: {average_cost}")

Average Cost: 0.0


In [105]:
evaluator = BinaryClassificationEvaluator()

# You can specify the metric name in the evaluator (e.g., areaUnderROC or areaUnderPR)
roc_auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

# Print the evaluation result
print(f"ROC AUC: {roc_auc}")

ROC AUC: 1.0
