**Import Stuff**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

**Initialize Spark Session**

In [2]:
spark = SparkSession.builder \
        .appName("Logistic Regression Example") \
        .getOrCreate()

24/06/13 09:37:26 WARN Utils: Your hostname, MacBook-Pro-di-Alessio.local resolves to a loopback address: 127.0.0.1; using 192.168.68.110 instead (on interface en0)
24/06/13 09:37:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/13 09:37:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**read the input data**

The input data has this format:
| Light | Temperature | Humidity | Comfortable |
|---------|---------|---------|------------|
| 0 -> 4096 | Degrees C| Percentage | Boolean |

In [3]:
data = spark.read.csv('python/training.csv', header=True, inferSchema=True)
data.show(5)

+-----+-----------+--------+-----------+
|light|temperature|humidity|comfortable|
+-----+-----------+--------+-----------+
|   49|      29.55|    48.3|          1|
|   48|      29.54|    48.3|          1|
|   48|      29.55|    48.3|          1|
|   47|      29.55|    48.3|          1|
|   47|      29.54|    48.3|          1|
+-----+-----------+--------+-----------+
only showing top 5 rows



In [4]:
#rows with comfortable set to 0
data.filter(col('comfortable') == 0).show(5)

+-----+-----------+--------+-----------+
|light|temperature|humidity|comfortable|
+-----+-----------+--------+-----------+
|    4|      29.51|   49.52|          0|
|    4|      29.52|   49.74|          0|
|    4|      29.52|   49.94|          0|
|    4|      29.54|   50.17|          0|
|    1|      29.54|   50.41|          0|
+-----+-----------+--------+-----------+
only showing top 5 rows



**Let's assemble features into a single vector**

In [5]:
assembler = VectorAssembler(inputCols=['light', 'temperature', 'humidity'],outputCol='features')
assembled_data = assembler.transform(data).select(col('features'), col('comfortable').alias('label'))
assembled_data.show(5)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[49.0,29.55,48.3]|    1|
|[48.0,29.54,48.3]|    1|
|[48.0,29.55,48.3]|    1|
|[47.0,29.55,48.3]|    1|
|[47.0,29.54,48.3]|    1|
+-----------------+-----+
only showing top 5 rows



**Split the data into training and test data**

In [7]:
train_data, test_data = assembled_data.randomSplit([0.7, 0.3], seed=42)
train_data.show(5)
test_data.show(5)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[0.0,29.54,50.06]|    0|
|[0.0,29.54,50.06]|    0|
|[0.0,29.55,50.42]|    0|
|[0.0,29.55,50.82]|    0|
|[0.0,29.55,50.82]|    0|
+-----------------+-----+
only showing top 5 rows

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[0.0,29.55,50.42]|    0|
|[0.0,29.55,51.27]|    0|
|[0.0,29.55,51.76]|    0|
|[0.0,29.55,51.76]|    0|
|[0.0,29.56,52.77]|    0|
+-----------------+-----+
only showing top 5 rows



Create a Logistic Regression model

In [8]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(train_data)
test_results = lr_model.transform(test_data)
test_results.show(5)

24/06/13 09:41:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/13 09:41:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[0.0,29.55,50.42]|    0|[1.17169673564049...|[0.76345157172274...|       0.0|
|[0.0,29.55,51.27]|    0|[1.36789000277732...|[0.79703903786043...|       0.0|
|[0.0,29.55,51.76]|    0|[1.48098965089147...|[0.81472201509312...|       0.0|
|[0.0,29.55,51.76]|    0|[1.48098965089147...|[0.81472201509312...|       0.0|
|[0.0,29.56,52.77]|    0|[1.75545278502775...|[0.85263923846211...|       0.0|
+-----------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



**Evaluate the model using the test data**

In [9]:
test_results = lr_model.transform(test_data)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label')
accuracy = evaluator.evaluate(test_results)
print('Accuracy:', accuracy)

Accuracy: 0.9761326296923228


**True positive, true negative, false positive, false negative**

In [10]:
test_results.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   28|
|    0|       0.0|  572|
|    1|       1.0|  304|
|    0|       1.0|   33|
+-----+----------+-----+



**if accuracy is higer than 0.5, the model is good so we can save it**

In [11]:
if accuracy > 0.5:
    #lr_model.save('python/lr_model')
    print('Model saved')

spark.stop()

Model saved
