Установим JDK (Java development kit) версии 8, Apache Spark версии 2.4.7 и findspark. Затем укажем пути JAVA_HOME и SPARK_HOME на скачанные программы.

In [1]:
%%bash
apt-get install openjdk-8-jdk-headless -qq > /dev/null

wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
tar xf spark-2.4.7-bin-hadoop2.7.tgz

pip install findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2


tar: spark-2.4.7-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

Инициализируем PySpark и создадим точку входа кластера.

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Считаем датасет для дальнейшей работы с ним. После этого выберем несколько количественных признаков для предсказывания категорий ‘Climate_Region_Pub’ и запишем их в новый датафрейм.

In [None]:
data = spark.read.csv('DS_2019_public.csv',inferSchema=True,sep=",", header=True,)
data.show(10)

to_keep = ['DIVISION', 'REPORTABLE_DOMAIN', 
           'TOTALDOLCOL', 'TOTALBTUCOL', 'CELLAR',
           'NWEIGHT', 'DOLLAREL', 'TOTUCSQFT',
           'DOLNGSPH', 'BTUNG', 'YEARMADE',
           'HEATROOM', 'Climate_Region_Pub']

df = data.select(to_keep)
df = df.dropna()

df.printSchema()


+------------------+--------+-----------------+--------+-----------+--------+---------+-----------+-----------+-----------+------+-----------+--------+--------+-----+--------+--------+-----------+---------+--------+--------+--------+--------+-------+------+-----------+--------+-----------+-----+--------+--------+--------+--------+---------+------+-----+--------+--------+--------+--------+--------+--------+------+-----------+--------+--------+-------+-----------+--------+--------+------------+---------+---------+---------+-------+----------+--------+--------+-------+--------+--------+--------+---------+------+-------+-------+--------+--------+--------+---------+--------+--------+--------+---------+--------+-------+------------+---------+-----------+-----------+--------+--------+----+------+-------+--------+-----+----+-------+-------+-----------+--------+-------+--------+-----------+---------+--------+---------+-----------+--------+-----+--------+---------+-----+-----+--------+----------

Подключим необходимые библиотеки и создадим функцию для преобразования столбца ‘Climate_Region_Pub’ в необходимый нам формат. После этого добавим преобразованную колонку с именем ‘label’ в датафрейм

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

def labelForResults(s):
    if s == 1:
        return 1
    else:
        return 0

label = UserDefinedFunction(labelForResults, IntegerType())

df= df.withColumn('label',label(df.Climate_Region_Pub))

df.show(5)

+--------+-----------------+-----------+-----------+------+-----------+--------+---------+--------+-----+--------+--------+------------------+-----+
|DIVISION|REPORTABLE_DOMAIN|TOTALDOLCOL|TOTALBTUCOL|CELLAR|    NWEIGHT|DOLLAREL|TOTUCSQFT|DOLNGSPH|BTUNG|YEARMADE|HEATROOM|Climate_Region_Pub|label|
+--------+-----------------+-----------+-----------+------+-----------+--------+---------+--------+-----+--------+--------+------------------+-----+
|      10|               26|         17|        621|     0| 8599.17201|     475|      400| 367.654|85895|    1998|       4|                 5|    0|
|       1|                1|         49|        629|    -2|8969.915921|     588|      264| 581.517|51148|    1965|       2|                 1|    1|
|       3|                7|        101|       3627|     0| 18003.6396|     952|      400| 424.514|42230|    1985|       7|                 1|    1|
|       1|                1|          0|          0|     1|5999.605242|     705|     1912| 616.238|50020| 

Создадим наборы для обучения и тестирования

In [None]:
(trainingData, testData) = df.randomSplit([0.7, 0.3], 5)


Объединим все выбранные признаки в один вектор ‘features’

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['DIVISION', 'REPORTABLE_DOMAIN', 
                                       'TOTALDOLCOL', 'TOTALBTUCOL', 'CELLAR',
                                       'NWEIGHT', 'DOLLAREL', 'TOTUCSQFT',
                                       'DOLNGSPH', 'BTUNG', 'YEARMADE',
                                       'HEATROOM'], outputCol='features')

Инициализируем объект evaluator для оценки параметров модели

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()

Random forest classifier

Создадим экземпляр модели классификатора случайных лесов и присоединим его к концу конвейера pipeline. Затем создадим объект ParamGridBuilder для оптимизации гиперпараметров поиском по решётке и объект CrossValidator для оценки аналитической модели методом перекрестной проверки. После этого запустим тренировку модели функцией fit

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

rf = RandomForestClassifier(labelCol='label', featuresCol='features', 
                            numTrees=30, maxDepth=30)

pipeline = Pipeline(stages=[assembler, rf])

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
params = ParamGridBuilder().build()
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator,
                    numFolds=5)

In [None]:
cv = cv.fit(trainingData)


После этого проверим модель на тестовых данных и оценим нашу модель

In [None]:
# проверим модель на тестовых данных
prediction = cv.transform(testData)

# рассчитаем площадь под ROC-кривой (AUC)
auc = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderROC'})
print('AUC: %0.3f' % auc)

# рассчитаем элементы матрицы ошибок
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label <> prediction').count()
FP = prediction.filter('prediction = 1 AND label <> prediction').count()

prediction.groupBy('label', 'prediction').count().show()

# рассчитаем долю правильных ответов, точность модели, полноту
# и F-меру — среднее гармоническое precision и recall
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)


AUC: 0.992
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   75|
|    0|       0.0| 2163|
|    1|       1.0|  996|
|    0|       1.0|   74|
+-----+----------+-----+

n precision: 0.931
n recall: 0.930
n accuracy: 0.955
n F1 score: 0.930


In [None]:
prediction.select('Climate_Region_Pub', 'label', 'rawPrediction', 'prediction', 'probability').show(5)

+------------------+-----+-------------+----------+--------------------+
|Climate_Region_Pub|label|rawPrediction|prediction|         probability|
+------------------+-----+-------------+----------+--------------------+
|                 1|    1|   [0.0,30.0]|       1.0|           [0.0,1.0]|
|                 1|    1|   [0.0,30.0]|       1.0|           [0.0,1.0]|
|                 1|    1|   [1.0,29.0]|       1.0|[0.03333333333333...|
|                 1|    1|   [1.0,29.0]|       1.0|[0.03333333333333...|
|                 1|    1|   [0.0,30.0]|       1.0|           [0.0,1.0]|
+------------------+-----+-------------+----------+--------------------+
only showing top 5 rows



Logistic Regresiion

Создадим экземпляр модели классификатора логистической регрессии и присоединим его к концу конвейера pipeline. Затем создадим объект ParamGridBuilder для оптимизации гиперпараметров поиском по решётке и объект CrossValidator для оценки аналитической модели методом перекрестной проверки. 

Для метода поиска по решётке было использовано 2 параметра: lr.regParam –коэффициент регуляризации, который определяет, насколько модель должна быть регуляризована в диапазоне от 0.01 до 10. Чем больше число, тем более упорядочена модель; lr.elasticNetParam указывает способ регуляризации модели.

После этого запустим тренировку модели функцией fit и оценим нашу модель с помощью предыдущего кода


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
lr = LogisticRegression(maxIter=30)
pipeline = Pipeline(stages=[assembler, lr])

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
params = ParamGridBuilder()
params = params.addGrid(lr.regParam, [.01, .1, 1, 10]).addGrid(lr.elasticNetParam, [0, .5, 1])
params = params.build()
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator,
                    numFolds=5)

In [None]:
cv = cv.fit(trainingData)

In [None]:
# проверим модель на тестовых данных
prediction = cv.transform(testData)

# рассчитаем площадь под ROC-кривой (AUC)
auc = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderROC'})
print('AUC: %0.3f' % auc)

# рассчитаем элементы матрицы ошибок
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label <> prediction').count()
FP = prediction.filter('prediction = 1 AND label <> prediction').count()

prediction.groupBy('label', 'prediction').count().show()

# рассчитаем долю правильных ответов, точность модели, полноту
# и F-меру — среднее гармоническое precision и recall
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)


AUC: 0.925
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|  220|
|    0|       0.0| 2017|
|    1|       1.0|  851|
|    0|       1.0|  220|
+-----+----------+-----+

n precision: 0.795
n recall: 0.795
n accuracy: 0.867
n F1 score: 0.795


In [None]:
prediction.select('Climate_Region_Pub', 'label', 'rawPrediction', 'prediction', 'probability').show(5)

+------------------+-----+--------------------+----------+--------------------+
|Climate_Region_Pub|label|       rawPrediction|prediction|         probability|
+------------------+-----+--------------------+----------+--------------------+
|                 1|    1|[-1.1762760112169...|       1.0|[0.23572243985977...|
|                 1|    1|[-0.9908760180235...|       1.0|[0.27073908241727...|
|                 1|    1|[-0.8305255260769...|       1.0|[0.30353396191844...|
|                 1|    1|[-1.3920030500199...|       1.0|[0.19908817385183...|
|                 1|    1|[-1.4235923721636...|       1.0|[0.19409903063365...|
+------------------+-----+--------------------+----------+--------------------+
only showing top 5 rows

