In [None]:
!pip install pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

spark



In [None]:
iris = spark.read.csv('iris.csv', header=True, inferSchema=True)
iris.show(5)

+------------+-----------+------------+-----------+-------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_num|
+------------+-----------+------------+-----------+-------+-----------+
|         5.1|        3.5|         1.4|        0.2| Setosa|          0|
|         4.9|        3.0|         1.4|        0.2| Setosa|          0|
|         4.7|        3.2|         1.3|        0.2| Setosa|          0|
|         4.6|        3.1|         1.5|        0.2| Setosa|          0|
|         5.0|        3.6|         1.4|        0.2| Setosa|          0|
+------------+-----------+------------+-----------+-------+-----------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
# Создаем и формируем столбец с признаками(x_attributes)
vec_assembler = VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='x_attributes')

In [None]:
iris_new = vec_assembler.transform(iris)
iris_new.show(3)

+------------+-----------+------------+-----------+-------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_num|     x_attributes|
+------------+-----------+------------+-----------+-------+-----------+-----------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|          0|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2| Setosa|          0|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2| Setosa|          0|[4.7,3.2,1.3,0.2]|
+------------+-----------+------------+-----------+-------+-----------+-----------------+
only showing top 3 rows



**Pipeline. Как опция**

Для примера возьму столбец 'variety' если бы не было столбца 'variety_num'

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [None]:
pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='x_attributes'),
    StringIndexer(inputCol='variety', outputCol='variety_ID')
    ])

In [None]:
iris_Trained = pipeline.fit(iris) # обучение
iris_new_pipeline = iris_Trained.transform(iris) # транформация датасета

In [None]:
iris_new_pipeline.show(3)

+------------+-----------+------------+-----------+-------+-----------+-----------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_num|     x_attributes|variety_ID|
+------------+-----------+------------+-----------+-------+-----------+-----------------+----------+
|         5.1|        3.5|         1.4|        0.2| Setosa|          0|[5.1,3.5,1.4,0.2]|       0.0|
|         4.9|        3.0|         1.4|        0.2| Setosa|          0|[4.9,3.0,1.4,0.2]|       0.0|
|         4.7|        3.2|         1.3|        0.2| Setosa|          0|[4.7,3.2,1.3,0.2]|       0.0|
+------------+-----------+------------+-----------+-------+-----------+-----------------+----------+
only showing top 3 rows



In [None]:
# Формируем обучающую и тестовую выборки
train, test = iris_new_pipeline.randomSplit([0.3, 0.7], seed=12345)# Для обучения выбрано 30%

In [None]:
# Создаем и обучаем модель логической регрессии
from pyspark.ml.classification import LogisticRegression

In [None]:
ml_logregression = LogisticRegression(featuresCol='x_attributes', labelCol='variety_ID')

In [None]:
ml_logregression_iris = ml_logregression.fit(train)

In [None]:
test_res = ml_logregression_iris.transform(test)
test_res.show(15)

+------------+-----------+------------+-----------+----------+-----------+-----------------+----------+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|variety_num|     x_attributes|variety_ID|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+----------+-----------+-----------------+----------+--------------------+--------------------+----------+
|         4.3|        3.0|         1.1|        0.1|    Setosa|          0|[4.3,3.0,1.1,0.1]|       0.0|[452.428503566451...|[1.0,8.1409701848...|       0.0|
|         4.5|        2.3|         1.3|        0.3|    Setosa|          0|[4.5,2.3,1.3,0.3]|       0.0|[296.195850919264...|[2.73341895917972...|       1.0|
|         4.6|        3.1|         1.5|        0.2|    Setosa|          0|[4.6,3.1,1.5,0.2]|       0.0|[390.805463182654...|[1.0,1.7254142893...|       0.0|
|         4.6|        3.2|         1.4|        0.2|    Set

In [None]:
train_res = ml_logregression_iris.transform(train)
train_res.show(5)

+------------+-----------+------------+-----------+-------+-----------+-----------------+----------+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_num|     x_attributes|variety_ID|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+-------+-----------+-----------------+----------+--------------------+--------------------+----------+
|         4.4|        2.9|         1.4|        0.2| Setosa|          0|[4.4,2.9,1.4,0.2]|       0.0|[393.052097813064...|[0.99999998937737...|       0.0|
|         4.4|        3.0|         1.3|        0.2| Setosa|          0|[4.4,3.0,1.3,0.2]|       0.0|[412.489742749319...|[1.0,3.0185580859...|       0.0|
|         4.4|        3.2|         1.3|        0.2| Setosa|          0|[4.4,3.2,1.3,0.2]|       0.0|[438.096502946082...|[1.0,3.6904595711...|       0.0|
|         4.6|        3.6|         1.0|        0.2| Setosa|          0|[4.6,

In [None]:
# Оценка качества модели
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
multi_eva = MulticlassClassificationEvaluator(labelCol='variety_ID')
multi_eva.evaluate(test_res)

0.8805097302078726

In [None]:
multi_eva.evaluate(train_res)

1.0

# Модель дерева решений
from pyspark.ml.classification import DecisionTreeClassifier # Классификатор решений