<a href="https://colab.research.google.com/github/AbinathAAA/Machine-Learning-on-Big-Data-Labs/blob/main/titanic_synthetic_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
#raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r".\\diabetes.csv")
dataset = spark.read.csv('drive/My Drive/Colab Notebooks/titanic_synthetic_data.csv',inferSchema=True, header =True)

In [5]:
dataset.columns

['Pclass',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Sex',
 'Survived',
 'Embarked_Q',
 'Embarked_S']

In [6]:
dataset.describe().select("Summary","Pclass","Age","SibSp","Parch").show()

+-------+------------------+------------------+------------------+------------------+
|Summary|            Pclass|               Age|             SibSp|             Parch|
+-------+------------------+------------------+------------------+------------------+
|  count|           1000000|           1000000|           1000000|           1000000|
|   mean|          1.999411|         39.517847|          4.500906|          4.497493|
| stddev|0.8168263709530482|23.092708389398037|2.8714427426552285|2.8728101169393594|
|    min|                 1|                 0|                 0|                 0|
|    max|                 3|                79|                 9|                 9|
+-------+------------------+------------------+------------------+------------------+



In [7]:
dataset.describe().select("Summary","Fare","Sex","Survived","Embarked_Q","Embarked_S").show()

+-------+------------------+------------------+------------------+-------------------+------------------+
|Summary|              Fare|               Sex|          Survived|         Embarked_Q|        Embarked_S|
+-------+------------------+------------------+------------------+-------------------+------------------+
|  count|           1000000|           1000000|           1000000|            1000000|           1000000|
|   mean| 254.9829083944402|           0.50082|           0.49987|           0.332717|          0.333661|
| stddev|141.53963219142216|0.4999995775993916|0.5000002331001768|0.47118639616145436|0.4715204761307361|
|    min|10.000191449724019|                 0|                 0|                  0|                 0|
|    max| 499.9999338139367|                 1|                 1|                  1|                 1|
+-------+------------------+------------------+------------------+-------------------+------------------+



In [8]:
from pyspark.sql.functions import col,isnan, when, count
dataset.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataset.columns]
   ).show()

+------+---+-----+-----+----+---+--------+----------+----------+
|Pclass|Age|SibSp|Parch|Fare|Sex|Survived|Embarked_Q|Embarked_S|
+------+---+-----+-----+----+---+--------+----------+----------+
|     0|  0|    0|    0|   0|  0|       0|         0|         0|
+------+---+-----+-----+----+---+--------+----------+----------+



In [9]:
# replace min value of zeros with Nan as data cleaning processs
import numpy as np
from pyspark.sql.functions import when

dataset = dataset.withColumn("Pclass", when(dataset.Pclass == 0, np.nan).otherwise(dataset.Pclass))
dataset = dataset.withColumn("Age", when(dataset.Age == 0, np.nan).otherwise(dataset.Age))
dataset = dataset.withColumn("Fare", when(dataset.Fare == 0, np.nan).otherwise(dataset.Fare))

dataset.select("Pclass","Age","Fare","Sex").show(5)


+------+----+------------------+---+
|Pclass| Age|              Fare|Sex|
+------+----+------------------+---+
|   3.0|42.0|397.00178367782024|  0|
|   1.0|52.0|302.12770032784186|  0|
|   3.0|25.0| 427.1058765283985|  1|
|   3.0|32.0| 326.8049924602327|  1|
|   1.0|40.0|17.718838497771657|  0|
+------+----+------------------+---+
only showing top 5 rows



In [10]:
# check again NAN values
from pyspark.sql.functions import col,isnan, when, count
dataset.select([count(when(isnan(c) , c)).alias(c) for c in dataset.columns]
   ).show()

+------+-----+-----+-----+----+---+--------+----------+----------+
|Pclass|  Age|SibSp|Parch|Fare|Sex|Survived|Embarked_Q|Embarked_S|
+------+-----+-----+-----+----+---+--------+----------+----------+
|     0|12475|    0|    0|   0|  0|       0|         0|         0|
+------+-----+-----+-----+----+---+--------+----------+----------+



In [11]:
from pyspark.ml.feature import Imputer
# Impute missing values in Pclass, Age, and Fare
imputer = Imputer(
    inputCols=["Pclass", "Age", "Fare"],
    outputCols=["Pclass", "Age", "Fare"]
)

model = imputer.fit(dataset)
dataset = model.transform(dataset)

dataset.show(5)


+------+----+-----+-----+------------------+---+--------+----------+----------+
|Pclass| Age|SibSp|Parch|              Fare|Sex|Survived|Embarked_Q|Embarked_S|
+------+----+-----+-----+------------------+---+--------+----------+----------+
|   3.0|42.0|    6|    7|397.00178367782024|  0|       0|         0|         1|
|   1.0|52.0|    6|    7|302.12770032784186|  0|       0|         1|         0|
|   3.0|25.0|    3|    8| 427.1058765283985|  1|       1|         1|         0|
|   3.0|32.0|    9|    7| 326.8049924602327|  1|       0|         0|         0|
|   1.0|40.0|    0|    5|17.718838497771657|  0|       0|         0|         0|
+------+----+-----+-----+------------------+---+--------+----------+----------+
only showing top 5 rows



In [12]:
#us combine all the features in one single feature vector.
cols=dataset.columns

# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
dataset=assembler.transform(dataset)
dataset.select("features").show(truncate=False)

+-----------------------------------------------------------------+
|features                                                         |
+-----------------------------------------------------------------+
|[3.0,42.0,6.0,7.0,397.00178367782024,0.0,0.0,0.0,1.0]            |
|[1.0,52.0,6.0,7.0,302.12770032784186,0.0,0.0,1.0,0.0]            |
|[3.0,25.0,3.0,8.0,427.1058765283985,1.0,1.0,1.0,0.0]             |
|[3.0,32.0,9.0,7.0,326.8049924602327,1.0,0.0,0.0,0.0]             |
|(9,[0,1,3,4],[1.0,40.0,5.0,17.718838497771657])                  |
|[1.0,20.0,7.0,2.0,402.54904361394784,0.0,0.0,0.0,0.0]            |
|[3.0,76.0,4.0,9.0,52.61038014285054,0.0,0.0,1.0,0.0]             |
|[2.0,22.0,4.0,7.0,196.48213366632623,1.0,0.0,0.0,0.0]            |
|[3.0,78.0,4.0,8.0,388.6259182464271,1.0,0.0,0.0,0.0]             |
|[3.0,15.0,1.0,8.0,299.45900279822325,1.0,0.0,1.0,0.0]            |
|[3.0,68.0,8.0,7.0,479.98134512112455,1.0,0.0,0.0,1.0]            |
|[3.0,3.0,7.0,1.0,93.03405698775256,0.0,1.0,1.0,

In [13]:
#Standard Sclarizer
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
dataset=standardscaler.fit(dataset).transform(dataset)
dataset.select("features","Scaled_features").show(5)

+--------------------+--------------------+
|            features|     Scaled_features|
+--------------------+--------------------+
|[3.0,42.0,6.0,7.0...|[3.67275115824148...|
|[1.0,52.0,6.0,7.0...|[1.22425038608049...|
|[3.0,25.0,3.0,8.0...|[3.67275115824148...|
|[3.0,32.0,9.0,7.0...|[3.67275115824148...|
|(9,[0,1,3,4],[1.0...|(9,[0,1,3,4],[1.2...|
+--------------------+--------------------+
only showing top 5 rows



In [14]:
#Train, test split
train, test = dataset.randomSplit([0.8, 0.2], seed=12345)

In [18]:
 #imbalance in the dataset, observe the use of Where
dataset_size=float(train.select("Survived").count())
numPositives=train.select("Survived").where('Survived == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 399854
Percentage of ones are 49.96651021438461


In [19]:
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0.5003348978561539


In [21]:
# balance
train=train.withColumn("classWeights", when(train.Survived == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

+------------------+
|      classWeights|
+------------------+
|0.5003348978561539|
|0.4996651021438461|
|0.5003348978561539|
|0.5003348978561539|
|0.4996651021438461|
+------------------+
only showing top 5 rows



In [34]:
 #Feature selection
# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='Outcome',fpr=0.05)
train=css.fit(train).transform(train)
test=css.fit(test).transform(test)
test.select("Aspect").show(5,truncate=False)


IllegalArgumentException: Outcome does not exist. Available: Pclass, Age, SibSp, Parch, Fare, Sex, Survived, Embarked_Q, Embarked_S, features, Scaled_features, classWeights

In [35]:
from pyspark.ml.classification import LogisticRegression

# Set the correct label column
lr = LogisticRegression(
    labelCol="Survived",
    featuresCol="Aspect",
    weightCol="classWeights",  # Optional, only if you're using class weighting
    maxIter=10
)

# Train model
model = lr.fit(train)

# Predictions
predict_train = model.transform(train)
predict_test = model.transform(test)

# Display predictions
predict_test.select("Survived", "prediction").show(10)

IllegalArgumentException: Aspect does not exist. Available: Pclass, Age, SibSp, Parch, Fare, Sex, Survived, Embarked_Q, Embarked_S, features, Scaled_features, classWeights