In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Logalong').getOrCreate()

24/11/01 02:31:37 WARN Utils: Your hostname, AlienEE resolves to a loopback address: 127.0.1.1; using 10.0.0.176 instead (on interface wlp4s0)
24/11/01 02:31:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/01 02:31:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/01 02:31:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
df = spark.read.csv('datasets/titanic.csv', inferSchema=True, header=True)

In [5]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
df.head(5)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
 Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
 Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]

In [7]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [8]:
my_cols = df.select([
 'Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [10]:
# Deal with missing data
my_final_data = my_cols.na.drop()

In [11]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                                OneHotEncoder, StringIndexer)

In [12]:
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

In [13]:
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

In [14]:
embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')

In [15]:
assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'EmbarkedVec', 'Age', 'SibSp', 'Parch', 'Fare'], outputCol='features')


In [16]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [17]:
log_reg_titanic = LogisticRegression(featuresCol='features', labelCol='Survived')

In [18]:
pipeline = Pipeline(stages=[gender_indexer, embark_indexer,
                            gender_encoder, embark_encoder, 
                            assembler, log_reg_titanic])


In [19]:
train_data, test_data = my_final_data.randomSplit([0.7, 0.3])

In [20]:
train_data.describe().show()

24/11/01 02:53:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 3:>                                                          (0 + 1) / 1]

+-------+------------------+------------------+------+------------------+------------------+-------------------+-----------------+--------+
|summary|          Survived|            Pclass|   Sex|               Age|             SibSp|              Parch|             Fare|Embarked|
+-------+------------------+------------------+------+------------------+------------------+-------------------+-----------------+--------+
|  count|               486|               486|   486|               486|               486|                486|              486|     486|
|   mean|0.4218106995884774| 2.236625514403292|  NULL|29.301975308641975|0.4897119341563786|0.42386831275720166| 35.5114541152263|    NULL|
| stddev|0.4943574535966055|0.8443725979765409|  NULL|14.316462753796145|0.8983932503702878|  0.867135499432062|54.51830783959455|    NULL|
|    min|                 0|                 1|female|              0.42|                 0|                  0|              0.0|       C|
|    max|           

                                                                                

In [21]:
fit_model = pipeline.fit(train_data)

In [22]:
results = fit_model.transform(test_data)

In [27]:
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [26]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived')

In [28]:
AUC = my_eval.evaluate(results)

In [29]:
AUC

0.814643188137164

24/11/01 06:27:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 6858896 ms exceeds timeout 120000 ms
24/11/01 06:27:13 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/01 06:27:15 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$