In [39]:
import findspark
findspark.init('/home/danielf/spark-3.3.1-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
spark = SparkSession.builder.appName('lrcode').getOrCreate()

23/02/21 17:32:08 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/02/21 17:32:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/21 17:32:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/21 17:32:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/21 17:32:12 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [10]:
path = '/home/danielf/Desktop/pyspark-course-udemy/Spark_for_Machine_Learning/Logistic_Regression/titanic.csv'
df = spark.read.csv(path, header=True, inferSchema=True)

In [11]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [27]:
cols = df.select([
 'Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [28]:
final_df = cols.na.drop()
final_df.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



In [31]:
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')

In [32]:
assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'EmbarkedVec', 'Age', 
                                       'SibSp', 'Parch', 'Fare'], outputCol='features')

In [33]:
lr = LogisticRegression(featuresCol='features', labelCol='Survived')

In [34]:
pipeline = Pipeline(stages=[gender_indexer, embark_indexer, gender_encoder, embark_encoder, assembler, lr])

In [52]:
train, test = final_df.randomSplit([0.7, 0.3])
test.show()

+--------+------+------+----+-----+-----+--------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|    Fare|Embarked|
+--------+------+------+----+-----+-----+--------+--------+
|       0|     1|female| 2.0|    1|    2|  151.55|       S|
|       0|     1|  male|19.0|    1|    0|    53.1|       S|
|       0|     1|  male|22.0|    0|    0|135.6333|       C|
|       0|     1|  male|24.0|    0|    0|    79.2|       C|
|       0|     1|  male|27.0|    0|    2|   211.5|       C|
|       0|     1|  male|28.0|    0|    0|    47.1|       S|
|       0|     1|  male|29.0|    0|    0|    30.0|       S|
|       0|     1|  male|33.0|    0|    0|     5.0|       S|
|       0|     1|  male|36.0|    1|    0|   78.85|       S|
|       0|     1|  male|39.0|    0|    0|     0.0|       S|
|       0|     1|  male|45.0|    1|    0|  83.475|       S|
|       0|     1|  male|46.0|    0|    0|    79.2|       C|
|       0|     1|  male|47.0|    0|    0| 34.0208|       S|
|       0|     1|  male|49.0|    1|    1

In [50]:
fit_lr = pipeline.fit(train)

In [51]:
results = fit_lr.transform(test)
results.head()

Row(Survived=0, Pclass=1, Sex='female', Age=2.0, SibSp=1, Parch=2, Fare=151.55, Embarked='S', SexIndex=1.0, EmbarkedIndex=0.0, SexVec=SparseVector(1, {}), EmbarkedVec=SparseVector(2, {0: 1.0}), features=DenseVector([1.0, 0.0, 1.0, 0.0, 2.0, 1.0, 2.0, 151.55]), rawPrediction=DenseVector([-4.0631, 4.0631]), probability=DenseVector([0.0169, 0.9831]), prediction=1.0)

In [40]:
myeval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [45]:
auc = myeval.evaluate(results)

In [43]:
results.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [46]:
auc

0.7786970458286407