In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('logistic-regression').getOrCreate()

In [3]:
data = spark.read.format('libsvm').load('../data/sample_libsvm_data.txt')

In [4]:
train_data, test_data = data.randomSplit([.7, .3], seed=42)

In [5]:
_ = train_data.describe().show(), test_data.describe().show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|                65|
|   mean|0.6307692307692307|
| stddev|0.4863521990681871|
|    min|               0.0|
|    max|               1.0|
+-------+------------------+

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                 35|
|   mean|0.45714285714285713|
| stddev|  0.505432670960188|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+



In [6]:
train_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[95,96,97,12...|
|  0.0|(692,[98,99,100,1...|
|  0.0|(692,[121,122,123...|
|  0.0|(692,[122,123,124...|
|  0.0|(692,[122,123,148...|
|  0.0|(692,[123,124,125...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[127,128,129...|
|  0.0|(692,[127,128,129...|
|  0.0|(692,[128,129,130...|
|  0.0|(692,[152,153,154...|
|  0.0|(692,[152,153,154...|
|  0.0|(692,[153,154,155...|
+-----+--------------------+
only showing top 20 rows



In [7]:
from pyspark.ml.classification import LogisticRegression

In [8]:
lr = LogisticRegression(
    featuresCol='features', labelCol='label', predictionCol='prediction'
)


In [9]:
lr_model = lr.fit(train_data)

In [10]:
print(f'Model coefficients size: {lr_model.coefficients.size}')
print(f'Model intercept: {lr_model.intercept}')

Model coefficients size: 692
Model intercept: 0.6767017958001451


In [11]:
training_summary = lr_model.summary

In [12]:
print(f'Training iterations: {training_summary.totalIterations}')
print(f'Training objectiveHistory: {training_summary.objectiveHistory}')


Training iterations: 27
Training objectiveHistory: [0.6585450135382944, 0.07332303793193415, 0.017993288359572226, 0.007005263776563439, 0.003851287819358681, 0.0019875086916976165, 0.0011176497559172, 0.00069246206230374, 0.0004018780652102688, 0.0002082397759757608, 0.00011557063457239789, 6.388164723255147e-05, 3.470327122134214e-05, 1.7974164674298307e-05, 9.266691809870797e-06, 4.657551123781397e-06, 2.3276641288387567e-06, 1.135340273449097e-06, 5.352471947063032e-07, 2.6207641799911666e-07, 1.3276831040995945e-07, 6.627107746197472e-08, 3.3247558876828846e-08, 1.663831549237458e-08, 8.331114919371366e-09, 4.1693571129450015e-09, 2.0865047351824197e-09, 1.043990806691911e-09]


In [13]:
test_results = lr_model.evaluate(test_data)

In [14]:
test_results.fMeasureByLabel()
print(f'Accuracy: {test_results.accuracy}')
print(f'Precision: {test_results.precisionByLabel}')
print(f'Recall: {test_results.recallByLabel}')

Accuracy: 0.9714285714285714
Precision: [0.95, 1.0]
Recall: [1.0, 0.9375]


In [15]:
unlabeled_data = test_data.select('features')
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|(692,[100,101,102...|
|(692,[123,124,125...|
|(692,[123,124,125...|
|(692,[124,125,126...|
|(692,[124,125,126...|
|(692,[124,125,126...|
|(692,[125,126,127...|
|(692,[126,127,128...|
|(692,[126,127,128...|
|(692,[126,127,128...|
|(692,[127,128,129...|
|(692,[129,130,131...|
|(692,[150,151,152...|
|(692,[151,152,153...|
|(692,[152,153,154...|
|(692,[153,154,155...|
|(692,[153,154,155...|
|(692,[154,155,156...|
|(692,[234,235,237...|
|(692,[97,98,99,12...|
+--------------------+
only showing top 20 rows



In [16]:
predictions = lr_model.transform(unlabeled_data)
predictions.printSchema()
predictions.show()

root
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(692,[100,101,102...|[12.2223514699748...|[0.99999508075940...|       0.0|
|(692,[123,124,125...|[29.7047074388391...|[0.99999999999987...|       0.0|
|(692,[123,124,125...|[20.9176718835119...|[0.99999999917667...|       0.0|
|(692,[124,125,126...|[45.0073919136100...|           [1.0,0.0]|       0.0|
|(692,[124,125,126...|[24.3368966091045...|[0.99999999997304...|       0.0|
|(692,[124,125,126...|[22.7841794949669...|[0.99999999987266...|       0.0|
|(692,[125,126,127...|[26.5337010444085...|[0.99999999999700...|       0.0|
|(692,[126,127,128...|[16.9920772001454...|[0.99999995827132...

In [17]:
titanic = spark.read.csv('../data/titanic.csv', inferSchema=True, header=True)

In [18]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [19]:
titanic.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [20]:
titanic.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [21]:
titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [22]:
label = 'Survived',
features = [
    'Pclass',
    'Sex',
    'Age',
    'SibSp',
    'Parch',
    'Fare',
    'Embarked'
]

In [23]:
titanic = titanic.na.fill(-1, subset=['Pclass', 'Age', 'SibSp', 'Parch', 'Fare'])
titanic = titanic.na.fill('missing', subset=['Sex', 'Embarked'])

In [24]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer

In [25]:
sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndexed')
embarked_indexed = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndexed')

In [26]:
si_one_hot = OneHotEncoder(inputCol='SexIndexed', outputCol='SexOneHot')
ei_one_hot = OneHotEncoder(inputCol='EmbarkedIndexed', outputCol='EmbarkedOneHot')

In [27]:
assembler = VectorAssembler(
    inputCols=[
        'Pclass',
        'SexOneHot',
        'Age',
        'SibSp',
        'Parch',
        'Fare',
        'EmbarkedOneHot'
    ], outputCol='features'
)

In [28]:
from pyspark.ml.pipeline import Pipeline

In [29]:
titanic_lr = LogisticRegression(
    featuresCol='features', labelCol='Survived', predictionCol='prediction'
)

pp = Pipeline(stages=[
    sex_indexer, embarked_indexed, si_one_hot, ei_one_hot, assembler, titanic_lr
])

In [30]:
titanic_train, titanic_test = titanic.randomSplit([.7, .3], seed=42)

In [31]:
titanic_model = pp.fit(titanic_train)

In [32]:
test_results = titanic_model.transform(titanic_test)

In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [34]:
test_results.select('Survived', 'prediction').show(10)

+--------+----------+
|Survived|prediction|
+--------+----------+
|       1|       1.0|
|       0|       0.0|
|       1|       1.0|
|       1|       1.0|
|       0|       0.0|
|       0|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       0.0|
|       1|       0.0|
+--------+----------+
only showing top 10 rows



In [35]:
evaluator = BinaryClassificationEvaluator(labelCol='Survived', rawPredictionCol='prediction')

In [36]:
print(f'AUC: {evaluator.evaluate(test_results)}')