In [2]:
import findspark
findspark.init('/opt/spark/spark-3.2.1-bin-hadoop3.2')

In [3]:
import warnings
warnings.filterwarnings('ignore')

In [4]:
## Import SparkSession
from pyspark.sql import SparkSession

## Create SparkSession 
spark = SparkSession.builder \
      .master("local") \
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1") \
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1") \
      .appName("Spark-MLlib-Titanic") \
      .getOrCreate() 

22/06/22 14:33:38 WARN Utils: Your hostname, ubuntu-machine resolves to a loopback address: 127.0.1.1; using 192.168.128.161 instead (on interface wlp0s20f3)
22/06/22 14:33:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/22 14:33:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/22 14:33:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/22 14:33:39 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "Titanic") \
    .option("startingOffsets", "earliest") \
    .load()
    
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
from pyspark.sql.types import StructField, IntegerType, StringType, StructType, FloatType
from pyspark.sql.functions import from_json, col

schema = StructType() \
    .add("PassengerId",IntegerType()) \
    .add("Survived",IntegerType()) \
    .add("Pclass",IntegerType()) \
    .add("Name", StringType()) \
    .add("Sex", StringType()) \
    .add("Age", FloatType()) \
    .add("SibSp", IntegerType()) \
    .add("Parch", IntegerType()) \
    .add("Ticket", StringType()) \
    .add("Fare", FloatType()) \
    .add("Cabin", StringType()) \
    .add("Embarked", StringType())

In [7]:
df = df.selectExpr("CAST(value AS STRING)")

In [8]:
df = df.select(from_json(col('value'),schema).alias('data')).select('data.*')

In [9]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [10]:
## Fill Nan values (Age)

from pyspark.sql.functions import when

# With toPandas()

# pd_df = df.toPandas()
# pd_df['Age'] = pd_df.groupby('Sex')['Age'].apply(lambda x: x.fillna(x.mean()))

# df = spark.createDataFrame(pd_df)
# df.select('Age').show()

# With pyspark

male_mean = df.filter((df['Age'] != 'NaN') & (df['Sex'] == 'male')).agg({'Age':'mean'}).collect()[0][0]
female_mean = df.filter((df['Age'] != 'NaN') & (df['Sex'] == 'female')).agg({'Age':'mean'}).collect()[0][0]

df = df.withColumn("Age",  when((df.Age == "NaN") & (df.Sex == "male"), male_mean).otherwise(df.Age))
df = df.withColumn("Age",  when((df.Age == "NaN") & (df.Sex == "female"), female_mean).otherwise(df.Age))

                                                                                

In [11]:
## Surname

from pyspark.sql.functions import udf, col

_udf = udf(lambda x: x.split(',')[0])
df = df.withColumn('Surname', _udf(col('Name')))

In [12]:
## Title

_udf = udf(lambda x: x.split(',')[1].split('.')[0].replace(' ',''))
df = df.withColumn('Title', _udf(col('Name')))

In [13]:
## Gender-Rate

male_rate = df.groupBy(['Sex']).agg({'Survived':'mean'}).collect()[0][1]
female_rate = df.groupBy(['Sex']).agg({'Survived':'mean'}).collect()[1][1]

_udf = udf(lambda x: male_rate if x == 'male' else female_rate)
df = df.withColumn('Gender-Rate', _udf(col('Sex')))

In [14]:
## Fill NaN values (Embarked)

df = df.na.fill('S', subset=['Embarked'])

In [15]:
## Embarked-Rate

df.groupBy(['Sex','Embarked']).agg({'Survived':'mean'}).collect()

male_C_rate = df.groupBy(['Sex','Embarked']).agg({'Survived':'mean'}).collect()[0][2]
male_S_rate = df.groupBy(['Sex','Embarked']).agg({'Survived':'mean'}).collect()[1][2]
male_Q_rate = df.groupBy(['Sex','Embarked']).agg({'Survived':'mean'}).collect()[2][2]
female_C_rate = df.groupBy(['Sex','Embarked']).agg({'Survived':'mean'}).collect()[3][2]
female_S_rate = df.groupBy(['Sex','Embarked']).agg({'Survived':'mean'}).collect()[4][2]
female_Q_rate = df.groupBy(['Sex','Embarked']).agg({'Survived':'mean'}).collect()[5][2]

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def func(Sex, Embarked):

    # if Sex == 'male' and Embarked == 'C':
    #     return male_C_rate
    # if Sex == 'male' and Embarked == 'S':
    #     return male_S_rate
    # if Sex == 'male' and Embarked == 'Q':
    #     return male_Q_rate

    # if Sex == 'female' and Embarked == 'C':
    #     return female_C_rate
    # if Sex == 'female' and Embarked == 'S':
    #     return female_S_rate
    # if Sex == 'female' and Embarked == 'Q':
    #     return female_Q_rate

    sex = ['male','male','male','female','female','female']
    embarked = ['C','S','Q','C','S','Q']
    embarked_rate = [male_C_rate,male_S_rate,male_Q_rate,female_C_rate,female_S_rate,female_Q_rate]

    for i in range(6):
        if Sex == sex[i] and Embarked == embarked[i]:
            return embarked_rate[i]
            
    return 0
func_udf = udf(func, FloatType())
df = df.withColumn('Embarked-Rate',func_udf(df['Sex'], df['Embarked']))

In [16]:
## Pclass-Rate

df.groupBy(['Sex','Pclass']).agg({'Survived':'mean'}).collect()

male_3_rate = df.groupBy(['Sex','Pclass']).agg({'Survived':'mean'}).collect()[0][2]
female_3_rate = df.groupBy(['Sex','Pclass']).agg({'Survived':'mean'}).collect()[1][2]
female_1_rate = df.groupBy(['Sex','Pclass']).agg({'Survived':'mean'}).collect()[2][2]
female_2_rate = df.groupBy(['Sex','Pclass']).agg({'Survived':'mean'}).collect()[3][2]
male_2_rate = df.groupBy(['Sex','Pclass']).agg({'Survived':'mean'}).collect()[4][2]
male_1_rate = df.groupBy(['Sex','Pclass']).agg({'Survived':'mean'}).collect()[5][2]

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def func(Sex, Pclass):

    sex = ['male','female','female','female','male','male']
    pclass = [3,3,1,2,2,1]
    pclass_rate = [male_3_rate,female_3_rate,female_1_rate,female_2_rate,male_2_rate,male_1_rate]

    for i in range(6):
        if Sex == sex[i] and Pclass == pclass[i]:
            return pclass_rate[i]
            
    return 0
func_udf = udf(func, FloatType())
df = df.withColumn('Pclass-Rate',func_udf(df['Sex'], df['Pclass']))

In [17]:
## FamilySize

def func(is_Family):
    if is_Family != 0:
        return 1
    else:
        return 0    

df = df.withColumn('is_Family', df['Parch'] + df['SibSp'])

func_udf = udf(func, IntegerType())
df = df.withColumn('is_Family', func_udf(df['is_Family']))

df = df.drop('Parch','SibSp')

In [18]:
df = df.drop('PassengerId', 'Name', 'Ticket','Cabin', 'Surname')

In [19]:
## Categorical Sex

from pyspark.ml.feature import StringIndexer

gender_indexer = StringIndexer(inputCol="Sex", outputCol="sexIndex")

df = gender_indexer.fit(df).transform(df)
df = df.drop('Sex')

In [20]:
## Categorical Embarked

from pyspark.ml.feature import StringIndexer

embarked_indexer = StringIndexer(inputCol="Embarked", outputCol="embarkedIndex")

df = embarked_indexer.fit(df).transform(df)
df = df.drop('Embarked','Fare')

In [21]:
from pyspark.sql.functions import regexp_replace

df = df.withColumn('Title', regexp_replace('Title', 'theCountess', 'Mrs'))
df = df.withColumn('Title', regexp_replace('Title', 'Don', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Col', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Rev', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Mme', 'Mrs'))
df = df.withColumn('Title', regexp_replace('Title', 'Capt', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Dr', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Sir', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Jonkheer', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Mlle', 'Mrs'))
df = df.withColumn('Title', regexp_replace('Title', 'Major', 'Mr'))
df = df.withColumn('Title', regexp_replace('Title', 'Ms', 'Mrs'))

In [22]:
## Categorical Title

from pyspark.ml.feature import StringIndexer

title_indexer = StringIndexer(inputCol="Title", outputCol="titleIndex")

df = title_indexer.fit(df).transform(df)
df = df.drop('Title')

22/06/22 14:33:57 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:33:58 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:33:58 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:33:58 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:33:58 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:33:58 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

In [23]:
df = df.withColumn('Gender-Rate',col('Gender-Rate').cast('int').alias('Gender-Rate'))

In [25]:
df\
.write.format("mongo").mode("overwrite")\
    .option("database","company")\
    .option("collection", "Titanic").save()

22/06/22 14:34:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:34:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:34:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:34:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:34:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/06/22 14:34:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un