In [2]:
from pyspark import SparkContext
sc = SparkContext()
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Titanic Final") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
df = spark.read.csv('data/Titanic.csv',header=True, inferSchema = True)

In [4]:
df = df.drop('_c0')

In [5]:
# Drop Columns with Null Values
df = df.na.drop()

In [6]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|  C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|  E46|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1| PP 9549|   16.7|   G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  113783|  26.55| C103|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



In [7]:
df.describe

<bound method DataFrame.describe of DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]>

In [8]:
df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [9]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [10]:
df = df.select('Survived', 'Sex', 'Embarked', 'Pclass', 'Age', 'SibSp', 'Parch', 'Fare')
cols = df.columns

In [11]:
# Transform datatypes for training
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ['Sex', 'Embarked']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'Survived', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [12]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)



In [13]:
df_RDD = df.rdd.map(list)

In [14]:
df_RDD.take(10)

[[0.0,
  DenseVector([0.0, 0.0, 1.0, 1.0, 38.0, 1.0, 0.0, 71.2833]),
  1,
  'female',
  'C',
  1,
  38.0,
  1,
  0,
  71.2833],
 [0.0,
  DenseVector([0.0, 1.0, 0.0, 1.0, 35.0, 1.0, 0.0, 53.1]),
  1,
  'female',
  'S',
  1,
  35.0,
  1,
  0,
  53.1],
 [1.0,
  DenseVector([1.0, 1.0, 0.0, 1.0, 54.0, 0.0, 0.0, 51.8625]),
  0,
  'male',
  'S',
  1,
  54.0,
  0,
  0,
  51.8625],
 [0.0,
  DenseVector([0.0, 1.0, 0.0, 3.0, 4.0, 1.0, 1.0, 16.7]),
  1,
  'female',
  'S',
  3,
  4.0,
  1,
  1,
  16.7],
 [0.0,
  SparseVector(8, {1: 1.0, 3: 1.0, 4: 58.0, 7: 26.55}),
  1,
  'female',
  'S',
  1,
  58.0,
  0,
  0,
  26.55],
 [0.0,
  DenseVector([1.0, 1.0, 0.0, 2.0, 34.0, 0.0, 0.0, 13.0]),
  1,
  'male',
  'S',
  2,
  34.0,
  0,
  0,
  13.0],
 [0.0,
  DenseVector([1.0, 1.0, 0.0, 1.0, 28.0, 0.0, 0.0, 35.5]),
  1,
  'male',
  'S',
  1,
  28.0,
  0,
  0,
  35.5],
 [1.0,
  DenseVector([1.0, 1.0, 0.0, 1.0, 19.0, 3.0, 2.0, 263.0]),
  0,
  'male',
  'S',
  1,
  19.0,
  3,
  2,
  263.0],
 [0.0,
  DenseVector([

In [15]:
train, test = df.randomSplit([0.7, 0.3])

In [16]:
# Trains model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [17]:
lrModel.numFeatures

8

In [18]:
lrModel.coefficients

DenseVector([3.0026, -2.4742, -2.8621, 0.6101, 0.0286, -0.1504, 0.5134, -0.0027])

In [19]:
lrModel.intercept

-1.6919301912592406

In [20]:
# Get accuracy of trained model
trainingSummary = lrModel.summary
print(trainingSummary.accuracy)

0.7518248175182481
