In [1]:
import pyspark
import findspark
import pandas as pd
import matplotlib.pyplot as plt
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Обучите модель классификации для цветков Iris.

In [2]:
# Взять данные и загрузить в pyspark
iris = spark.read.csv('iris.csv', inferSchema=True, header=True)
iris.show()

+------------+-----------+------------+-----------+-------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_num|
+------------+-----------+------------+-----------+-------+-----------+
|         5.1|        3.5|         1.4|        0.2| Setosa|          0|
|         4.9|        3.0|         1.4|        0.2| Setosa|          0|
|         4.7|        3.2|         1.3|        0.2| Setosa|          0|
|         4.6|        3.1|         1.5|        0.2| Setosa|          0|
|         5.0|        3.6|         1.4|        0.2| Setosa|          0|
|         5.4|        3.9|         1.7|        0.4| Setosa|          0|
|         4.6|        3.4|         1.4|        0.3| Setosa|          0|
|         5.0|        3.4|         1.5|        0.2| Setosa|          0|
|         4.4|        2.9|         1.4|        0.2| Setosa|          0|
|         4.9|        3.1|         1.5|        0.1| Setosa|          0|
|         5.4|        3.7|         1.5|        0.2| Setosa|     

In [3]:
# При помощи VectorAssembler преобразовать все колонки с признаками в одну (использовать PipeLine - опционально)
pipeline = Pipeline(stages =
[
  StringIndexer(inputCol='variety', outputCol='varietyInd'),
  VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],\
                  outputCol='Features')
])

pipelineTrained = pipeline.fit(iris)
df_features = pipelineTrained.transform(iris)

df_features.show()

+------------+-----------+------------+-----------+-------+-----------+----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_num|varietyInd|         Features|
+------------+-----------+------------+-----------+-------+-----------+----------+-----------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|          0|       0.0|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2| Setosa|          0|       0.0|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2| Setosa|          0|       0.0|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2| Setosa|          0|       0.0|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2| Setosa|          0|       0.0|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4| Setosa|          0|       0.0|[5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3| Setosa|          0|       0.0|[4.6,3.4,

In [4]:
# pipeline = Pipeline(stages =
# [
#   StringIndexer(inputCol='sepal_length', outputCol='slInd'),
#     StringIndexer(inputCol='sepal_width', outputCol='swInd'),
#     StringIndexer(inputCol='petal_length', outputCol='plInd'),
#     StringIndexer(inputCol='petal_width', outputCol='pwInd'),
#     StringIndexer(inputCol='variety', outputCol='varietyInd'),
#   VectorAssembler(inputCols=['slInd', 'swInd', 'plInd', 'pwInd'],\
#                   outputCol='Features')
# ])

# pipelineTrained = pipeline.fit(iris)
# df_features = pipelineTrained.transform(iris)

# df_features.show()

In [5]:
# Разбить данные на train и test
train, test = df_features.randomSplit([0.8, 0.2], seed=38)

In [6]:
# Создать модель линейной регресии или модель дерева и обучить ее
lr = LogisticRegression(featuresCol='Features', labelCol='varietyInd')
model = lr.fit(train)

train_res = model.transform(train)
test_res = model.transform(test)

In [7]:
# Воспользоваться MulticlassClassificationEvaluator для оценки качества на train и test множестве
ev = MulticlassClassificationEvaluator(labelCol='varietyInd')
ev.evaluate(train_res)

1.0

In [8]:
ev.evaluate(test_res)

0.9375