**Установка pysparc**

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 44.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=77f08bfd16b81ac2963ebfa2b7db05fff11c531a13a8033adb714fb11c7fd51a
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


**Загрузка библиотек**

In [2]:
import pandas as pd
import numpy as np

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession 

sc = SparkContext('local') 
spark = SparkSession(sc)

**1-2. Загрузка данных при помощи spark.read.csv из файла iris.csv.**

In [3]:
#header=True - при чтении используем первую строку как имена столбцов. 
#inferSchema=True - автоматически просматривает CSV-файл и выводит схему каждого столбца (тип столбца).
#quote="" - задаем как пустую строку. 

iris_df = spark.read.csv('iris.csv', header=True, inferSchema=True, quote="")

In [4]:
#вывод 5 первых строк
iris_df.show(5)

+-------------+---------------+----------------+---------------+------------+
|"sepal.length|""sepal.width""|""petal.length""|""petal.width""|""variety"""|
+-------------+---------------+----------------+---------------+------------+
|         "5.1|            3.5|             1.4|            0.2| ""Setosa"""|
|         "4.9|            3.0|             1.4|            0.2| ""Setosa"""|
|         "4.7|            3.2|             1.3|            0.2| ""Setosa"""|
|         "4.6|            3.1|             1.5|            0.2| ""Setosa"""|
|           "5|            3.6|             1.4|            0.2| ""Setosa"""|
+-------------+---------------+----------------+---------------+------------+
only showing top 5 rows



In [5]:
#Типы атрибутов
iris_df.printSchema()

root
 |-- "sepal.length: string (nullable = true)
 |-- ""sepal.width"": double (nullable = true)
 |-- ""petal.length"": double (nullable = true)
 |-- ""petal.width"": double (nullable = true)
 |-- ""variety""": string (nullable = true)



In [6]:
#преобразуем названия колонок, без учета кавычек
iris_df = iris_df.toDF("sepal_length","sepal_width","petal_length","petal_width","variety")
iris_df.show(5)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    variety|
+------------+-----------+------------+-----------+-----------+
|        "5.1|        3.5|         1.4|        0.2|""Setosa"""|
|        "4.9|        3.0|         1.4|        0.2|""Setosa"""|
|        "4.7|        3.2|         1.3|        0.2|""Setosa"""|
|        "4.6|        3.1|         1.5|        0.2|""Setosa"""|
|          "5|        3.6|         1.4|        0.2|""Setosa"""|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows



In [7]:
#в значениях первого и последнего столбца удаляем лишние кавычки

from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType

undue_symbol = '["]'   #задаем символ для поиска и исключения
iris_df = iris_df.withColumn("sepal_length", regexp_replace(col("sepal_length"), undue_symbol, ''))\
    .withColumn("variety", regexp_replace(col("variety"), undue_symbol, ''))

iris_df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|           5|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [8]:
#так как атрибут sepal.length имеет тип данных строка, то нужно переопределить тип в double

iris_df = iris_df.withColumn('sepal_length', col('sepal_length').cast(DoubleType()))
iris_df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



In [9]:
#вывод итогового датасета после обработки 
iris_df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



**Промежуточная аттестация. Модуль «Большие данные»**

**3. При помощи VectorAssembler преобразовать все колонки с признаками в одну**
**(использовать PipeLine)** 

In [21]:
iris_df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



Все столбцы, кроме variety являются double. Столбец variety - содержит строковые данные string.
Чтобы использовать variety для обучения модели, мы должны преобразовать строковые данные в значения с помощью StringIndexer

VectorAssembler - Преобразователь признаков, который объединяет несколько столбцов в векторный столбец.

In [22]:
#Загрузка небходимых библиотек
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors

In [29]:
#При помощи VectorAssembler объединяем числовые значения столбцов с признаками в вектора: 
# inputCols-'имена входных столбцов (признаки)'
# outputCol-'имя выходного столбца'
#Результаты вносим в новый столбец features. 

#Используем StringIndexer для преобразования строковых данных столбца variety в числовые значения: 
# 0 (Setosa), 1 (Versicolor) и 2 (Virginica)
#Результаты вносим в новый столбец class_ind.

#Объединяем оба действия методом Pipeline
pipeline = Pipeline(stages = 
[VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol='features'),
 StringIndexer(inputCol = 'variety', outputCol = 'class_ind')]
)

#Используем полученный pipeline для обучения на нашем датасете:
va_si_pipeline = pipeline.fit(iris_df)

#При помощи метода Transform преобразуем исходный датасет в новый:
vector_iris_df = va_si_pipeline.transform(iris_df)
vector_iris_df.show(5)

+------------+-----------+------------+-----------+-------+-----------------+---------+
|sepal_length|sepal_width|petal_length|petal_width|variety|         features|class_ind|
+------------+-----------+------------+-----------+-------+-----------------+---------+
|         5.1|        3.5|         1.4|        0.2| Setosa|[5.1,3.5,1.4,0.2]|      0.0|
|         4.9|        3.0|         1.4|        0.2| Setosa|[4.9,3.0,1.4,0.2]|      0.0|
|         4.7|        3.2|         1.3|        0.2| Setosa|[4.7,3.2,1.3,0.2]|      0.0|
|         4.6|        3.1|         1.5|        0.2| Setosa|[4.6,3.1,1.5,0.2]|      0.0|
|         5.0|        3.6|         1.4|        0.2| Setosa|[5.0,3.6,1.4,0.2]|      0.0|
+------------+-----------+------------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [32]:
#Для проверки посчитаем кол-во каждого вида ириса и соответствующий ему индекс
check_test = vector_iris_df.groupBy("variety","class_ind").count().sort("class_ind")
check_test.show()

+----------+---------+-----+
|   variety|class_ind|count|
+----------+---------+-----+
|    Setosa|      0.0|   50|
|Versicolor|      1.0|   50|
| Virginica|      2.0|   50|
+----------+---------+-----+



В новом датасете каждому виду ириса соответствует уникальный индекс-классификатор и по 50 записей. Обучение исходного датасета прошло успешно. 

**4. Разбить данные на train и test**

In [52]:
#Разделим исходный датасет на две выборки - для обучения (train=80%) и тестирования (test=20%). 
#Выборку train будем использовать для обучения модели, а на test проверять эффективность обученной модели.
#Для этого используем метод randomSplit, который разделит исходный датасет в заданной пропорции (80/20) 

train, test = vector_iris_df.randomSplit([0.8, 0.2], seed=2)
print('\033[1m'+'Обучающая выборка:'+'\033[0m')
train.show()
print('\033[1m'+'Тестовая выборка:'+'\033[0m')
test.show()
print('\033[1m'+'Кол-во записей в обучающей выборке:'+'\033[0m', train.count())
print('\033[1m'+'Кол-во записей в тестовой выборке:'+'\033[0m', test.count())
print('\033[1m'+'Кол-во записей в исходном датасете:'+'\033[0m', vector_iris_df.count())

[1mОбучающая выборка:[0m
+------------+-----------+------------+-----------+----------+-----------------+---------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|         features|class_ind|
+------------+-----------+------------+-----------+----------+-----------------+---------+
|         4.3|        3.0|         1.1|        0.1|    Setosa|[4.3,3.0,1.1,0.1]|      0.0|
|         4.4|        2.9|         1.4|        0.2|    Setosa|[4.4,2.9,1.4,0.2]|      0.0|
|         4.4|        3.0|         1.3|        0.2|    Setosa|[4.4,3.0,1.3,0.2]|      0.0|
|         4.4|        3.2|         1.3|        0.2|    Setosa|[4.4,3.2,1.3,0.2]|      0.0|
|         4.6|        3.1|         1.5|        0.2|    Setosa|[4.6,3.1,1.5,0.2]|      0.0|
|         4.6|        3.2|         1.4|        0.2|    Setosa|[4.6,3.2,1.4,0.2]|      0.0|
|         4.6|        3.4|         1.4|        0.3|    Setosa|[4.6,3.4,1.4,0.3]|      0.0|
|         4.6|        3.6|         1.0|        0.2|    Setosa|[

**5. Создать модель логистической регресии и обучить ее**

In [56]:
#В качестве алгоритма обучения используем модель логистической регрессии (Logistic Regression). 
from pyspark.ml.classification import LogisticRegression

#Обучаем нашу модель на данных столбца векторов 'features', в качестве классификатора берем значения из столбца 'class_ind'
lr_iris_df=LogisticRegression(featuresCol='features', labelCol='class_ind')
LrModel=lr_iris_df.fit(train)

#При помощи метода Transform получаем новые датасеты с предсказаниями по обучащей и тестовой выборкам:
train_iris_df=LrModel.transform(train)
test_iris_df=LrModel.transform(test)

print('\033[1m'+'Предсказание по обучающей выборке:'+'\033[0m')
train_iris_df.show()
print('\033[1m'+'Предсказание по тестовой выборке:'+'\033[0m')
test_iris_df.show()

[1mПредсказание по обучающей выборке:[0m
+------------+-----------+------------+-----------+----------+-----------------+---------+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|         features|class_ind|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+----------+-----------------+---------+--------------------+--------------------+----------+
|         4.3|        3.0|         1.1|        0.1|    Setosa|[4.3,3.0,1.1,0.1]|      0.0|[38.4259720969243...|[0.99999999999980...|       0.0|
|         4.4|        2.9|         1.4|        0.2|    Setosa|[4.4,2.9,1.4,0.2]|      0.0|[32.3793368523923...|[0.99999999983944...|       0.0|
|         4.4|        3.0|         1.3|        0.2|    Setosa|[4.4,3.0,1.3,0.2]|      0.0|[35.1889516742506...|[0.99999999999501...|       0.0|
|         4.4|        3.2|         1.3|        0.2|    Setosa|[4.4,3.2,1.3,0.2]|      0.0|[39

Предварительно можно сделать вывод, что наша модель обучена успешно и предоставляет верные предсказания. 

**6. Воспользоваться MulticlassClassificationEvaluator для оценки качества на train и test
множестве**

In [77]:
#Импортируем метод MulticlassClassificationEvaluator для оценки качества предсказания нашей модели:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Оцениваем насколько точно были предсказаны значения классификатора 'class_ind' для обучающей и тестовой выборки:
ms_ev = MulticlassClassificationEvaluator(labelCol = 'class_ind')

train_ev = ms_ev.evaluate(train_iris_df)*100
print ('\033[1m'+'Точность предсказания на обучающей выборке:'+'\033[0m', __builtin__.round(train_ev,2), '%')

test_ev = ms_ev.evaluate(test_iris_df)*100
print ('\033[1m'+'Точность предсказания на тестовой выборке:'+'\033[0m', __builtin__.round(test_ev,2), '%')

[1mТочность предсказания на обучающей выборке:[0m 98.36 %
[1mТочность предсказания на тестовой выборке:[0m 100.0 %


Модель обучена качественно (точность более 98%). Данные, предсказанные  моделью, пригодны для дальнейшего использования. 