In [32]:
import os  
os.environ["SPARK_HOME"] = "/home/iiitdmk-sic40/spark-3.5.5"  # Update if different
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"  # Update if needed
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"

import findspark  
findspark.init()

from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder \
    .master("spark://172.16.72.48:7077") \
    .appName("Lung_Cancer_prediction") \
    .getOrCreate()

print("Lung_Cancer")

Lung_Cancer


In [33]:
df = spark.read.csv("hdfs://localhost:9000/user/iiitdmk-sic40/survey_lung_cancer.csv", header=True, inferSchema=True)
df.show(5)  # Display the first 5 rows


+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+
|GENDER|AGE|SMOKING|YELLOW_FINGERS|ANXIETY|PEER_PRESSURE|CHRONIC DISEASE|FATIGUE |ALLERGY |WHEEZING|ALCOHOL CONSUMING|COUGHING|SHORTNESS OF BREATH|SWALLOWING DIFFICULTY|CHEST PAIN|LUNG_CANCER|
+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+
|     M| 69|      1|             2|      2|            1|              1|       2|       1|       2|                2|       2|                  2|                    2|         2|        YES|
|     M| 74|      2|             1|      1|            1|              2|       2|       2|       1|                1|       1|                  2|                    2|         2|        YES|
|     F| 59|      1|             1|

In [34]:
df.groupBy("LUNG_CANCER").count().show()


+-----------+-----+
|LUNG_CANCER|count|
+-----------+-----+
|        YES|  270|
|         NO|   39|
+-----------+-----+



In [35]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Convert categorical columns
indexer = StringIndexer(inputCol="GENDER", outputCol="GENDER_INDEXED", handleInvalid="keep")
label_indexer = StringIndexer(inputCol="LUNG_CANCER", outputCol="label", handleInvalid="keep")

# Assemble feature columns
feature_columns = [c for c in df.columns if c not in ["GENDER", "LUNG_CANCER"]]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Create Pipeline
pipeline = Pipeline(stages=[indexer, label_indexer, assembler])
df_prepared = pipeline.fit(df).transform(df)
df_prepared.show(5)


+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+--------------+-----+--------------------+
|GENDER|AGE|SMOKING|YELLOW_FINGERS|ANXIETY|PEER_PRESSURE|CHRONIC DISEASE|FATIGUE |ALLERGY |WHEEZING|ALCOHOL CONSUMING|COUGHING|SHORTNESS OF BREATH|SWALLOWING DIFFICULTY|CHEST PAIN|LUNG_CANCER|GENDER_INDEXED|label|            features|
+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+--------------+-----+--------------------+
|     M| 69|      1|             2|      2|            1|              1|       2|       1|       2|                2|       2|                  2|                    2|         2|        YES|           0.0|  0.0|[69.0,1.0,2.0,2.0...|
|     M| 74|      2|             1|      1|            1|   

In [36]:
from imblearn.over_sampling import SMOTE
import pandas as pd
import numpy as np

# Convert to Pandas
pdf = df_prepared.select("features", "label").toPandas()
X = np.array(pdf['features'].tolist())
y = pdf['label'].values

# Apply SMOTE
smote = SMOTE(random_state=42)
X_res, y_res = smote.fit_resample(X, y)

# Convert back to Spark DataFrame
resampled_pdf = pd.DataFrame(X_res, columns=[f"feature_{i}" for i in range(X_res.shape[1])])
resampled_pdf['label'] = y_res
resampled_df = spark.createDataFrame(resampled_pdf)

# Assemble Features Again
feature_columns = [f"feature_{i}" for i in range(X_res.shape[1])]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
resampled_df = assembler.transform(resampled_df).select("features", "label")


In [37]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
resampled_df = scaler.fit(resampled_df).transform(resampled_df).select("scaled_features", "label")
resampled_df = resampled_df.withColumnRenamed("scaled_features", "features")


In [38]:
train_data, test_data = resampled_df.randomSplit([0.8, 0.2], seed=42)


In [39]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Logistic Regression Accuracy: {accuracy:.4f}")


Logistic Regression Accuracy: 0.9400


In [40]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       0.92      0.96      0.94        50
         1.0       0.96      0.92      0.94        50

    accuracy                           0.94       100
   macro avg       0.94      0.94      0.94       100
weighted avg       0.94      0.94      0.94       100



In [41]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dt_model = dt.fit(train_data)

predictions = dt_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Decision Tree Accuracy: {accuracy:.4f}")


Decision Tree Accuracy: 0.9400


In [42]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       0.92      0.96      0.94        50
         1.0       0.96      0.92      0.94        50

    accuracy                           0.94       100
   macro avg       0.94      0.94      0.94       100
weighted avg       0.94      0.94      0.94       100



In [43]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)
rf_model = rf.fit(train_data)

predictions = rf_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Random Forest Accuracy: {accuracy:.4f}")


Random Forest Accuracy: 0.9600


In [44]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       0.94      0.98      0.96        50
         1.0       0.98      0.94      0.96        50

    accuracy                           0.96       100
   macro avg       0.96      0.96      0.96       100
weighted avg       0.96      0.96      0.96       100



In [45]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=10)
gbt_model = gbt.fit(train_data)

predictions = gbt_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"GBT Accuracy: {accuracy:.4f}")


GBT Accuracy: 0.9500


In [46]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       0.94      0.96      0.95        50
         1.0       0.96      0.94      0.95        50

    accuracy                           0.95       100
   macro avg       0.95      0.95      0.95       100
weighted avg       0.95      0.95      0.95       100



In [47]:
from pyspark.ml.feature import Normalizer
from pyspark.sql.functions import col

# Drop "norm_features" if it already exists
if "norm_features" in train_data.columns:
    train_data = train_data.drop("norm_features")
if "norm_features" in test_data.columns:
    test_data = test_data.drop("norm_features")

# Normalize features
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)
train_data = normalizer.transform(train_data)
test_data = normalizer.transform(test_data)

train_data.show(5)  # Check transformed data

+--------------------+-----+--------------------+
|            features|label|       norm_features|
+--------------------+-----+--------------------+
|[-5.0785065131700...|  1.0|[-0.8238146907153...|
|[-2.2219277715584...|  0.0|[-0.5047557142075...|
|[-1.3525342415027...|  0.0|[-0.3286034687401...|
|[-1.2283351657804...|  0.0|[-0.3201667131354...|
|[-1.1041360900582...|  0.0|[-0.2598713691607...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [48]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
import numpy as np
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Convert Spark DataFrame to Pandas for broadcasting
train_pdf = train_data.select("norm_features", "label").toPandas()

# Broadcast training data
train_broadcast = spark.sparkContext.broadcast(train_pdf)

# Define UDF for Euclidean distance calculation
def knn_predict(test_row, k=5):
    test_features = np.array(test_row)
    train_features = np.array(train_broadcast.value["norm_features"].tolist())
    train_labels = np.array(train_broadcast.value["label"].tolist())

    # Compute Euclidean distance
    distances = np.linalg.norm(train_features - test_features, axis=1)

    # Get indices of k nearest neighbors
    k_indices = distances.argsort()[:k]

    # Get majority label
    knn_labels = train_labels[k_indices]
    return float(np.bincount(knn_labels.astype(int)).argmax())

# Register UDF
from pyspark.sql.types import DoubleType
knn_udf = udf(lambda features: float(knn_predict(features, k=5)), DoubleType())

# Apply KNN prediction
test_data = test_data.withColumn("prediction", knn_udf(col("norm_features")))

# Evaluate KNN Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_data)
print(f"KNN Accuracy: {accuracy:.4f}")



KNN Accuracy: 0.9400


                                                                                

In [49]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       0.94      0.96      0.95        50
         1.0       0.96      0.94      0.95        50

    accuracy                           0.95       100
   macro avg       0.95      0.95      0.95       100
weighted avg       0.95      0.95      0.95       100



In [50]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import when, col
from pyspark.sql.types import DoubleType

# Drop existing prediction column if it exists
if "prediction" in test_data.columns:
    test_data = test_data.drop("prediction")

# Initialize Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")

# Train the model
lr_model = lr.fit(train_data)

# Predict on test data
predictions = lr_model.transform(test_data)

# Apply thresholding (Assume 0.5 as threshold)
threshold = 0.5
predictions = predictions.withColumn("prediction", when(col("prediction") >= threshold, 1.0).otherwise(0.0))

# Ensure prediction is of DoubleType for evaluation
predictions = predictions.withColumn("prediction", col("prediction").cast(DoubleType()))

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Linear Regression Classification Accuracy: {accuracy:.4f}")

25/03/29 12:33:03 WARN Instrumentation: [3f3cb810] regParam is zero, which might cause numerical instability and overfitting.


Linear Regression Classification Accuracy: 0.9300


In [51]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       0.94      0.92      0.93        50
         1.0       0.92      0.94      0.93        50

    accuracy                           0.93       100
   macro avg       0.93      0.93      0.93       100
weighted avg       0.93      0.93      0.93       100



In [52]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import numpy as np
import pandas as pd

# Convert Spark DataFrame to Pandas
train_pdf = train_data.select("features", "label").toPandas()
test_pdf = test_data.select("features", "label").toPandas()

# Extract feature vectors
X_train = np.array(train_pdf["features"].tolist())
y_train = train_pdf["label"].values
X_test = np.array(test_pdf["features"].tolist())
y_test = test_pdf["label"].values

# Apply LDA
lda = LinearDiscriminantAnalysis()
lda.fit(X_train, y_train)

# Predict
y_pred = lda.predict(X_test)

# Convert to Spark DataFrame
predictions_pdf = pd.DataFrame({"label": y_test, "prediction": y_pred})
predictions_df = spark.createDataFrame(predictions_pdf)

# Evaluate Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_df)
print(f"LDA Accuracy: {accuracy:.4f}")

LDA Accuracy: 0.9300


In [53]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)


              precision    recall  f1-score   support

         0.0       0.94      0.92      0.93        50
         1.0       0.92      0.94      0.93        50

    accuracy                           0.93       100
   macro avg       0.93      0.93      0.93       100
weighted avg       0.93      0.93      0.93       100



In [64]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Define the MLP structure
layers = [len(feature_columns), 10, 5, 2]  # Input, two hidden layers, output (binary classification)

# Initialize MLP model
mlp = MultilayerPerceptronClassifier(layers=layers, featuresCol="features", labelCol="label", blockSize=128, seed=42)

# Train MLP
mlp_model = mlp.fit(train_data)

# Make predictions
predictions = mlp_model.transform(test_data)

# Evaluate MLP Model
accuracy = evaluator.evaluate(predictions)
print(f"MLP Accuracy: {accuracy:.4f}")

MLP Accuracy: 0.9900


In [55]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       1.00      0.98      0.99        50
         1.0       0.98      1.00      0.99        50

    accuracy                           0.99       100
   macro avg       0.99      0.99      0.99       100
weighted avg       0.99      0.99      0.99       100



In [56]:
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier

# Convert Spark DataFrame to Pandas
train_pdf = train_data.select("features", "label").toPandas()
test_pdf = test_data.select("features", "label").toPandas()

# Extract feature vectors
X_train = np.array(train_pdf["features"].tolist())
y_train = train_pdf["label"].values
X_test = np.array(test_pdf["features"].tolist())
y_test = test_pdf["label"].values

# Train AdaBoost
base_estimator = DecisionTreeClassifier(max_depth=2)
ada = AdaBoostClassifier(base_estimator=base_estimator, n_estimators=50, random_state=42)
ada.fit(X_train, y_train)

# Predict
y_pred = ada.predict(X_test)

# Convert to Spark DataFrame
predictions_pdf = pd.DataFrame({"label": y_test, "prediction": y_pred})
predictions_df = spark.createDataFrame(predictions_pdf)

# Evaluate Accuracy
accuracy = evaluator.evaluate(predictions_df)
print(f"AdaBoost Accuracy: {accuracy:.4f}")

AdaBoost Accuracy: 0.9200




In [57]:
from sklearn.metrics import classification_report
import numpy as np

# Convert PySpark DataFrame to Pandas
predictions_pd = predictions.select("label", "prediction").toPandas()

# Extract true labels and predicted labels as NumPy arrays
y_true = predictions_pd["label"].to_numpy()
y_pred = predictions_pd["prediction"].to_numpy()

# Generate classification report
report = classification_report(y_true, y_pred, digits=2)

# Print the report
print(report)

              precision    recall  f1-score   support

         0.0       1.00      0.98      0.99        50
         1.0       0.98      1.00      0.99        50

    accuracy                           0.99       100
   macro avg       0.99      0.99      0.99       100
weighted avg       0.99      0.99      0.99       100



In [66]:
mlp_model.write().overwrite().save("hdfs://localhost:9000/user/iiitdmk-sic40/mlp_model")

In [67]:
from pyspark.ml.classification import MultilayerPerceptronClassificationModel

# Load the trained MLP model
loaded_model = MultilayerPerceptronClassificationModel.load("hdfs://localhost:9000/user/iiitdmk-sic40/mlp_model")

In [68]:
# Assuming you have already loaded the test dataset
# Replace 'test_data' with your actual test DataFrame
predictions = loaded_model.transform(test_data)

# Show the predictions
predictions.select("features", "prediction", "probability").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------------------------------+
|features                                                                                                                                                                                                                                                                               |prediction|probability                                |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------------

In [71]:
print("Expected Input Layer Size:", loaded_model.getLayers()[0])

Expected Input Layer Size: 14


In [72]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

# Example input: Replace these 14 values with actual values from your dataset
single_input = [0.5, 1.2, 3.4, 2.1, 4.5, 5.6, 0.9, 3.3, 2.8, 1.5, 0.7, 6.1, 4.3, 2.2]  

# Convert input to a Spark DataFrame
single_df = spark.createDataFrame(
    [Row(features=Vectors.dense(single_input))]
)

# Make predictions
single_prediction = loaded_model.transform(single_df)

# Extract prediction result
predicted_value = single_prediction.select("prediction").collect()[0][0]

# Convert numeric prediction to Yes/No
output_label = "Yes" if predicted_value == 1.0 else "No"
print("Prediction:", output_label)

Prediction: No


In [73]:
spark.stop()