In [14]:
pip install pyspark --quiet

In [15]:
import numpy as np
import pandas as pd
import pyspark

In [16]:
from sklearn.model_selection import train_test_split
from pyspark.sql import SparkSession
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer,StandardScaler,VectorAssembler,VectorIndexer
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import count,when,col
from pyspark.sql import functions as F

In [17]:
spark=(SparkSession.builder
       .appName('Apache Spark')
       .config('Spark.executor.memory','1G')
       .config('Spark.executor.cores','4')
       .getOrCreate())
spark

In [18]:
url=('/content/drive/MyDrive/survey lung cancer.csv')
data=spark.read.format('csv')\
      .option('header','true')\
      .option('inferSchema','true')\
      .load(url)
data.cache()

DataFrame[GENDER: string, AGE: int, SMOKING: int, YELLOW_FINGERS: int, ANXIETY: int, PEER_PRESSURE: int, CHRONIC DISEASE: int, FATIGUE : int, ALLERGY : int, WHEEZING: int, ALCOHOL CONSUMING: int, COUGHING: int, SHORTNESS OF BREATH: int, SWALLOWING DIFFICULTY: int, CHEST PAIN: int, LUNG_CANCER: string]

In [19]:
data.show(5)

+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+
|GENDER|AGE|SMOKING|YELLOW_FINGERS|ANXIETY|PEER_PRESSURE|CHRONIC DISEASE|FATIGUE |ALLERGY |WHEEZING|ALCOHOL CONSUMING|COUGHING|SHORTNESS OF BREATH|SWALLOWING DIFFICULTY|CHEST PAIN|LUNG_CANCER|
+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+
|     M| 69|      1|             2|      2|            1|              1|       2|       1|       2|                2|       2|                  2|                    2|         2|        YES|
|     M| 74|      2|             1|      1|            1|              2|       2|       2|       1|                1|       1|                  2|                    2|         2|        YES|
|     F| 59|      1|             1|

In [20]:
data.head(5)

[Row(GENDER='M', AGE=69, SMOKING=1, YELLOW_FINGERS=2, ANXIETY=2, PEER_PRESSURE=1, CHRONIC DISEASE=1, FATIGUE =2, ALLERGY =1, WHEEZING=2, ALCOHOL CONSUMING=2, COUGHING=2, SHORTNESS OF BREATH=2, SWALLOWING DIFFICULTY=2, CHEST PAIN=2, LUNG_CANCER='YES'),
 Row(GENDER='M', AGE=74, SMOKING=2, YELLOW_FINGERS=1, ANXIETY=1, PEER_PRESSURE=1, CHRONIC DISEASE=2, FATIGUE =2, ALLERGY =2, WHEEZING=1, ALCOHOL CONSUMING=1, COUGHING=1, SHORTNESS OF BREATH=2, SWALLOWING DIFFICULTY=2, CHEST PAIN=2, LUNG_CANCER='YES'),
 Row(GENDER='F', AGE=59, SMOKING=1, YELLOW_FINGERS=1, ANXIETY=1, PEER_PRESSURE=2, CHRONIC DISEASE=1, FATIGUE =2, ALLERGY =1, WHEEZING=2, ALCOHOL CONSUMING=1, COUGHING=2, SHORTNESS OF BREATH=2, SWALLOWING DIFFICULTY=1, CHEST PAIN=2, LUNG_CANCER='NO'),
 Row(GENDER='M', AGE=63, SMOKING=2, YELLOW_FINGERS=2, ANXIETY=2, PEER_PRESSURE=1, CHRONIC DISEASE=1, FATIGUE =1, ALLERGY =1, WHEEZING=1, ALCOHOL CONSUMING=2, COUGHING=1, SHORTNESS OF BREATH=1, SWALLOWING DIFFICULTY=2, CHEST PAIN=2, LUNG_CANCER='

In [21]:
data.printSchema()

root
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- SMOKING: integer (nullable = true)
 |-- YELLOW_FINGERS: integer (nullable = true)
 |-- ANXIETY: integer (nullable = true)
 |-- PEER_PRESSURE: integer (nullable = true)
 |-- CHRONIC DISEASE: integer (nullable = true)
 |-- FATIGUE : integer (nullable = true)
 |-- ALLERGY : integer (nullable = true)
 |-- WHEEZING: integer (nullable = true)
 |-- ALCOHOL CONSUMING: integer (nullable = true)
 |-- COUGHING: integer (nullable = true)
 |-- SHORTNESS OF BREATH: integer (nullable = true)
 |-- SWALLOWING DIFFICULTY: integer (nullable = true)
 |-- CHEST PAIN: integer (nullable = true)
 |-- LUNG_CANCER: string (nullable = true)



In [22]:
data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+
|GENDER|AGE|SMOKING|YELLOW_FINGERS|ANXIETY|PEER_PRESSURE|CHRONIC DISEASE|FATIGUE |ALLERGY |WHEEZING|ALCOHOL CONSUMING|COUGHING|SHORTNESS OF BREATH|SWALLOWING DIFFICULTY|CHEST PAIN|LUNG_CANCER|
+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+
|     0|  0|      0|             0|      0|            0|              0|       0|       0|       0|                0|       0|                  0|                    0|         0|          0|
+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+



In [23]:
sindex=StringIndexer(inputCol='LUNG_CANCER',outputCol='LUNG_CANCER_STAGE')
data=sindex.fit(data).transform(data)
data.show(5)

+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+-----------------+
|GENDER|AGE|SMOKING|YELLOW_FINGERS|ANXIETY|PEER_PRESSURE|CHRONIC DISEASE|FATIGUE |ALLERGY |WHEEZING|ALCOHOL CONSUMING|COUGHING|SHORTNESS OF BREATH|SWALLOWING DIFFICULTY|CHEST PAIN|LUNG_CANCER|LUNG_CANCER_STAGE|
+------+---+-------+--------------+-------+-------------+---------------+--------+--------+--------+-----------------+--------+-------------------+---------------------+----------+-----------+-----------------+
|     M| 69|      1|             2|      2|            1|              1|       2|       1|       2|                2|       2|                  2|                    2|         2|        YES|              0.0|
|     M| 74|      2|             1|      1|            1|              2|       2|       2|       1|                1|       1|                  2|         

In [24]:
data1=data.select('FATIGUE ','ALLERGY ','AGE','SMOKING','YELLOW_FINGERS','ANXIETY','PEER_PRESSURE','CHRONIC DISEASE','WHEEZING','ALCOHOL CONSUMING','COUGHING','SHORTNESS OF BREATH','SWALLOWING DIFFICULTY','CHEST PAIN')
data1.show(5)

+--------+--------+---+-------+--------------+-------+-------------+---------------+--------+-----------------+--------+-------------------+---------------------+----------+
|FATIGUE |ALLERGY |AGE|SMOKING|YELLOW_FINGERS|ANXIETY|PEER_PRESSURE|CHRONIC DISEASE|WHEEZING|ALCOHOL CONSUMING|COUGHING|SHORTNESS OF BREATH|SWALLOWING DIFFICULTY|CHEST PAIN|
+--------+--------+---+-------+--------------+-------+-------------+---------------+--------+-----------------+--------+-------------------+---------------------+----------+
|       2|       1| 69|      1|             2|      2|            1|              1|       2|                2|       2|                  2|                    2|         2|
|       2|       2| 74|      2|             1|      1|            1|              2|       1|                1|       1|                  2|                    2|         2|
|       2|       1| 59|      1|             1|      1|            2|              1|       2|                1|       2|          

In [25]:
input_data=data1.rdd.map(lambda x:(x[0],DenseVector(x[1:])))
input_data

PythonRDD[74] at RDD at PythonRDD.scala:53

In [26]:
data1_index=spark.createDataFrame(input_data,['label','features'])
data1_index.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    2|[1.0,69.0,1.0,2.0...|
|    2|[2.0,74.0,2.0,1.0...|
|    2|[1.0,59.0,1.0,1.0...|
|    1|[1.0,63.0,2.0,2.0...|
|    1|[1.0,63.0,1.0,2.0...|
+-----+--------------------+
only showing top 5 rows



In [27]:
stdscale=StandardScaler(inputCol='features',outputCol='featured_scaled')
scaler=stdscale.fit(data1_index)
data1_scale=scaler.transform(data1_index)
data1_scale

DataFrame[label: bigint, features: vector, featured_scaled: vector]

In [28]:
data1_scale=data1_scale.drop('features')
data1_scale

DataFrame[label: bigint, featured_scaled: vector]

In [29]:
train_data,test_data=data1_scale.randomSplit([0.9,0.1],seed=12345)

In [30]:
train_data.show(5)

+-----+--------------------+
|label|     featured_scaled|
+-----+--------------------+
|    1|[2.00969471733901...|
|    1|[2.00969471733901...|
|    1|[2.00969471733901...|
|    1|[2.00969471733901...|
|    1|[2.00969471733901...|
+-----+--------------------+
only showing top 5 rows



In [31]:
test_data.show(5)

+-----+--------------------+
|label|     featured_scaled|
+-----+--------------------+
|    1|[2.00969471733901...|
|    1|[2.00969471733901...|
|    1|[2.00969471733901...|
|    1|[4.01938943467803...|
|    1|[4.01938943467803...|
+-----+--------------------+
only showing top 5 rows



In [32]:
model=['DecisionTree','RandomForest','Linear SVM']
model_results=[]

In [33]:
dtc=DecisionTreeClassifier(labelCol='label',featuresCol='featured_scaled')
dtcmodel=dtc.fit(train_data)
dtcpred=dtcmodel.transform(test_data)
evaluator=MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
dtcacc=evaluator.evaluate(dtcpred)
model_results.extend([[model[0],'{:.2%}'.format(dtcacc)]])
model_results

[['DecisionTree', '73.68%']]

In [34]:
rf=RandomForestClassifier(labelCol='label',featuresCol='featured_scaled',numTrees=10)
rfcmodel=rf.fit(train_data)
rfcpred=rfcmodel.transform(test_data)
evaluator=MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
rfcacc=evaluator.evaluate(rfcpred)
model_results.extend([[model[1],'{:.2%}'.format(rfcacc)]])
model_results

[['DecisionTree', '73.68%'], ['RandomForest', '78.95%']]

In [35]:
naive=NaiveBayes(labelCol='label',featuresCol='featured_scaled',smoothing=1.0,modelType='multinomial')
nbmodel=naive.fit(train_data)
nbpred=nbmodel.transform(test_data)
evaluator=MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
nbacc=evaluator.evaluate(nbpred)
model_results.extend([[model[2],'{:2%}'.format(nbacc)]])
model_results

[['DecisionTree', '73.68%'],
 ['RandomForest', '78.95%'],
 ['Linear SVM', '31.578947%']]

In [42]:
from pyspark.ml.linalg import Vectors

# Create a SparkSession
spark=SparkSession.builder\
    .appName("NaiveBayesExample")\
    .getOrCreate()

# Load training data
data=spark.createDataFrame([
    (0.0, Vectors.dense([2,1,69,1,2,2,1,1,2,2,2,2,2,2])),
],["label","featured_scaled"])

# Train a Randomforest model
model=rf.fit(data)

# Make predictions on new data
new_data=spark.createDataFrame([
    (Vectors.dense([2,1,69,1,2,2,1,1,2,2,2,2,2,2]),),
], ["featured_scaled"])

prediction=model.transform(new_data)

# Show the prediction result
prediction.show()



+--------------------+-------------+-----------+----------+
|     featured_scaled|rawPrediction|probability|prediction|
+--------------------+-------------+-----------+----------+
|[2.0,1.0,69.0,1.0...|        [7.0]|      [1.0]|       0.0|
+--------------------+-------------+-----------+----------+

