In [26]:
import boto3
import pyspark
from pyspark.sql import SparkSession

In [27]:
spark = SparkSession.builder.appName('demoETL_Processing').getOrCreate()

In [28]:
spark

In [29]:
data = spark.read.option('header', 'true').csv('../raw_dataset/lung cancer.csv')

In [30]:
data.printSchema()

root
 |-- GENDER: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- SMOKING: string (nullable = true)
 |-- YELLOW_FINGERS: string (nullable = true)
 |-- ANXIETY: string (nullable = true)
 |-- PEER_PRESSURE: string (nullable = true)
 |-- CHRONIC DISEASE: string (nullable = true)
 |-- FATIGUE : string (nullable = true)
 |-- ALLERGY : string (nullable = true)
 |-- WHEEZING: string (nullable = true)
 |-- ALCOHOL CONSUMING: string (nullable = true)
 |-- COUGHING: string (nullable = true)
 |-- SHORTNESS OF BREATH: string (nullable = true)
 |-- SWALLOWING DIFFICULTY: string (nullable = true)
 |-- CHEST PAIN: string (nullable = true)
 |-- LUNG_CANCER: string (nullable = true)



In [31]:
data.toPandas()

Unnamed: 0,GENDER,AGE,SMOKING,YELLOW_FINGERS,ANXIETY,PEER_PRESSURE,CHRONIC DISEASE,FATIGUE,ALLERGY,WHEEZING,ALCOHOL CONSUMING,COUGHING,SHORTNESS OF BREATH,SWALLOWING DIFFICULTY,CHEST PAIN,LUNG_CANCER
0,M,69,1,2,2,1,1,2,1,2,2,2,2,2,2,YES
1,M,74,2,1,1,1,2,2,2,1,1,1,2,2,2,YES
2,F,59,1,1,1,2,1,2,1,2,1,2,2,1,2,NO
3,M,63,2,2,2,1,1,1,1,1,2,1,1,2,2,NO
4,F,63,1,2,1,1,1,1,1,2,1,2,2,1,1,NO
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
304,F,56,1,1,1,2,2,2,1,1,2,2,2,2,1,YES
305,M,70,2,1,1,1,1,2,2,2,2,2,2,1,2,YES
306,M,58,2,1,1,1,1,1,2,2,2,2,1,1,2,YES
307,M,67,2,1,2,1,1,2,2,1,2,2,2,1,2,YES


**Data Overview**

• GENDER: Gender of patients

• AGE: Age of patients

• SMOKING to CHEST PAIN: 1 no, 2 yes

• LUNG_CANCER: Confirm disease or not

**Rename the columns**

In [32]:
from pyspark.sql.functions import col
for col_index in data.columns:
   if any(ch.isupper() for ch in col_index) or any(ch.isspace() for ch in col_index):
      new_name = col_index.replace(' ','_').lower()
      new_name = new_name.rstrip('_')
      data = data.withColumnRenamed(col_index, new_name)
data.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- smoking: string (nullable = true)
 |-- yellow_fingers: string (nullable = true)
 |-- anxiety: string (nullable = true)
 |-- peer_pressure: string (nullable = true)
 |-- chronic_disease: string (nullable = true)
 |-- fatigue: string (nullable = true)
 |-- allergy: string (nullable = true)
 |-- wheezing: string (nullable = true)
 |-- alcohol_consuming: string (nullable = true)
 |-- coughing: string (nullable = true)
 |-- shortness_of_breath: string (nullable = true)
 |-- swallowing_difficulty: string (nullable = true)
 |-- chest_pain: string (nullable = true)
 |-- lung_cancer: string (nullable = true)



**Handle null values**

In [33]:
# from pyspark.sql.functions import first, when
# for col_index in data.columns:
#       most_common_value = data.select(first(col_index)).collect()[0][0]
#       data = data.withColumn(col_index, when(col(col_index).isin(['nan', 'null', 'NaN']), most_common_value).otherwise(col(col_index)))

In [34]:
data.select('smoking').distinct().show()

+-------+
|smoking|
+-------+
|   null|
|      1|
|      2|
+-------+



In [35]:
data.toPandas()

Unnamed: 0,gender,age,smoking,yellow_fingers,anxiety,peer_pressure,chronic_disease,fatigue,allergy,wheezing,alcohol_consuming,coughing,shortness_of_breath,swallowing_difficulty,chest_pain,lung_cancer
0,M,69,1,2,2,1,1,2,1,2,2,2,2,2,2,YES
1,M,74,2,1,1,1,2,2,2,1,1,1,2,2,2,YES
2,F,59,1,1,1,2,1,2,1,2,1,2,2,1,2,NO
3,M,63,2,2,2,1,1,1,1,1,2,1,1,2,2,NO
4,F,63,1,2,1,1,1,1,1,2,1,2,2,1,1,NO
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
304,F,56,1,1,1,2,2,2,1,1,2,2,2,2,1,YES
305,M,70,2,1,1,1,1,2,2,2,2,2,2,1,2,YES
306,M,58,2,1,1,1,1,1,2,2,2,2,1,1,2,YES
307,M,67,2,1,2,1,1,2,2,1,2,2,2,1,2,YES


**Check invalid ages**

In [36]:
print('Number of invalid ages: ', data.select('age').filter((col('age') > 100) | (col('age') < 0)).count())

Number of invalid ages:  0


In [37]:
columns_to_check = ['gender',	'smoking','yellow_fingers', 'anxiety',	'peer_pressure', 'chronic_disease',	'fatigue', 'allergy', 'wheezing', 'alcohol_consuming','coughing', 'shortness_of_breath', 'swallowing_difficulty', 'chest_pain', 'lung_cancer']
for col_index in columns_to_check:
   distinct_vals = data.select(col_index).distinct()
   distinct_vals.show()

+------+
|gender|
+------+
|     F|
|  null|
|     M|
+------+

+-------+
|smoking|
+-------+
|   null|
|      1|
|      2|
+-------+

+--------------+
|yellow_fingers|
+--------------+
|             1|
|             2|
+--------------+

+-------+
|anxiety|
+-------+
|   null|
|      1|
|      2|
+-------+

+-------------+
|peer_pressure|
+-------------+
|            1|
|            2|
+-------------+

+---------------+
|chronic_disease|
+---------------+
|           null|
|              1|
|              2|
+---------------+

+-------+
|fatigue|
+-------+
|      1|
|      2|
+-------+

+-------+
|allergy|
+-------+
|   null|
|      1|
|      2|
+-------+

+--------+
|wheezing|
+--------+
|    null|
|       1|
|       2|
+--------+

+-----------------+
|alcohol_consuming|
+-----------------+
|                1|
|                2|
+-----------------+

+--------+
|coughing|
+--------+
|       1|
|       2|
+--------+

+-------------------+
|shortness_of_breath|
+-------------------+
|  