In [1]:
import findspark
findspark.init('/usr/local/spark')
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('mylogreg').getOrCreate()

In [3]:
from pyspark.ml.classification import LogisticRegression

In [7]:
fname = 'Data/Spark_for_Machine_Learning/Logistic_Regression/sample_libsvm_data.txt'
my_data = spark.read.format('libsvm').load(fname)

In [8]:
my_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [10]:
logreg = LogisticRegression()

In [11]:
logreg = logreg.fit(my_data)

In [12]:
log_summary = logreg.summary

In [13]:
log_summary.predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [14]:
log_summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[19.8534775947478...|[0.99999999761359...|       0.0|
|  1.0|(692,[158,159,160...|[-20.377398194908...|[1.41321555111056...|       1.0|
|  1.0|(692,[124,125,126...|[-27.401459284891...|[1.25804865126979...|       1.0|
|  1.0|(692,[152,153,154...|[-18.862741612668...|[6.42710509170303...|       1.0|
|  1.0|(692,[151,152,153...|[-20.483011833009...|[1.27157209200604...|       1.0|
|  0.0|(692,[129,130,131...|[19.8506078990277...|[0.99999999760673...|       0.0|
|  1.0|(692,[158,159,160...|[-20.337256674833...|[1.47109814695581...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.595579753418...|[3.08850168102631...|       1.0|
|  0.0|(692,[154,155,156...|[19.2708803215613...|[0.99999999572670...|       0.0|
|  0.0|(692,[127

In [15]:
lr_train, lr_test = my_data.randomSplit([0.7, 0.3])

In [16]:
final_model = LogisticRegression().fit(lr_train)

In [18]:
prediction_and_labels = final_model.evaluate(lr_test)

In [19]:
prediction_and_labels.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[95,96,97,12...|[21.9204846156774...|[0.99999999969796...|       0.0|
|  0.0|(692,[122,123,148...|[25.0828733094427...|[0.99999999998721...|       0.0|
|  0.0|(692,[124,125,126...|[32.2284192528478...|[0.99999999999999...|       0.0|
|  0.0|(692,[126,127,128...|[25.4948044954539...|[0.99999999999153...|       0.0|
|  0.0|(692,[128,129,130...|[25.0738004561041...|[0.99999999998710...|       0.0|
|  0.0|(692,[150,151,152...|[20.9995561531151...|[0.99999999924140...|       0.0|
|  0.0|(692,[152,153,154...|[13.7759952818010...|[0.99999895969469...|       0.0|
|  0.0|(692,[154,155,156...|[9.20281425523303...|[0.99989925469972...|       0.0|
|  0.0|(692,[234,235,237...|[6.50152016046053...|[0.99850109462956...|       0.0|
|  1.0|(692,[100

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [21]:
my_eval = BinaryClassificationEvaluator()

In [22]:
my_final_roc = my_eval.evaluate(prediction_and_labels.predictions)

In [24]:
my_final_roc

0.9925925925925926

# Titantic Exercise

In [7]:
fname = 'Data/Spark_for_Machine_Learning/Logistic_Regression/titanic.csv'
df = spark.read.csv(fname, inferSchema=True, header=True)

In [11]:
my_cols = df.select(['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked'])

In [12]:
# get rid of missing data
my_final_data = my_cols.na.drop()

In [15]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                                OneHotEncoder, StringIndexer)

In [41]:
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
# A B C
# 0 1 2
# ONE HOT ENCODE --> Creates three columns
# KEY A B C
# Example A
# [1, 0, 0]
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

In [42]:
embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex', outputCol='EmbarkVec')

In [43]:
assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'EmbarkVec', 'Age', 'SibSp', 'Parch', 'Fare'],
                            outputCol='features')

In [35]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [44]:
log_reg = LogisticRegression(featuresCol='features', labelCol='Survived')

In [45]:
pipeline = Pipeline(stages=[gender_indexer, embark_indexer,
                            gender_encoder, embark_encoder,
                            assembler, log_reg])

In [38]:
train_data, test_data = my_final_data.randomSplit([0.7, 0.3])

In [58]:
%time
fit_model = pipeline.fit(train_data)

CPU times: user 8 µs, sys: 0 ns, total: 8 µs
Wall time: 15.7 µs


In [59]:
results = fit_model.transform(test_data)

In [49]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [62]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [63]:
my_eval.evaluate(results)

0.7601809954751131