In [27]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import monotonically_increasing_id, format_number
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import IsotonicRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import FMRegressor
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
import numpy as np

In [28]:
# Создание SparkSession
spark = SparkSession.builder.appName("CrabAgePrediction").getOrCreate()
# Загружаем датасет
data = spark.read.csv("CrabAgePrediction.csv", header = True, inferSchema = True)

data.printSchema()
data.show()

root
 |-- Sex: string (nullable = true)
 |-- Length: double (nullable = true)
 |-- Diameter: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Shucked Weight: double (nullable = true)
 |-- Viscera Weight: double (nullable = true)
 |-- Shell Weight: double (nullable = true)
 |-- Age: integer (nullable = true)

+---+------+--------+------+-----------+--------------+--------------+------------+---+
|Sex|Length|Diameter|Height|     Weight|Shucked Weight|Viscera Weight|Shell Weight|Age|
+---+------+--------+------+-----------+--------------+--------------+------------+---+
|  F|1.4375|   1.175|0.4125| 24.6357155|    12.3320325|     5.5848515|    6.747181|  9|
|  M|0.8875|    0.65|0.2125| 5.40057975|     2.2963095|    1.37495075|   1.5592225|  6|
|  I|1.0375|   0.775|  0.25| 7.95203475|      3.231843|    1.60174675|  2.76407625|  6|
|  F| 1.175|  0.8875|  0.25|13.48018725|    4.74854125|    2.28213475|   5.2446575| 10|
|  I|0.8875|  0.6

In [29]:
# Преобразуем Пол краба в числа
indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
# Преобразовываем данные. Начинаем с выбора признаков
features = ["SexIndex", "Length", "Diameter" , "Height", "Weight", "Shucked Weight", "Viscera Weight", "Shell Weight"]
# Создаем вектор этих признаков
assembler = VectorAssembler(inputCols=features, outputCol="features")
# Разделяем данные на обучение и тест
(train, test) = data.randomSplit([0.7, 0.3])

# Объявляем необходимые переменные
feature_names = test.columns
feature_names.remove("Age")

# Списки для формирования структурированных данных
tupleRMSE = () # список среднекватратичных ошибок
Pred_arr = [] # список предсказанных значений
Importance_arr = [] # список признаков, оказывающих наибольшее влияние на предсказание

print("Train count: " + str(train.count()))
train.show(3)

print("Test count: " + str(test.count()))
test.show(3)

Train count: 2700
+---+------+--------+------+---------+--------------+--------------+------------+---+
|Sex|Length|Diameter|Height|   Weight|Shucked Weight|Viscera Weight|Shell Weight|Age|
+---+------+--------+------+---------+--------------+--------------+------------+---+
|  F|0.6875|  0.4875| 0.175|  2.26796|     0.8788345|    0.60951425|   0.7087375|  5|
|  F| 0.725|   0.525|0.1875|7.7961125|     3.2034935|    1.91359125|   0.9922325|  6|
|  F| 0.725|  0.5625|0.1875|  3.96893|    1.45999925|    0.66621325|     1.13398|  5|
+---+------+--------+------+---------+--------------+--------------+------------+---+
only showing top 3 rows

Test count: 1193
+---+------+--------+------+----------+--------------+--------------+------------+---+
|Sex|Length|Diameter|Height|    Weight|Shucked Weight|Viscera Weight|Shell Weight|Age|
+---+------+--------+------+----------+--------------+--------------+------------+---+
|  F|0.8375|    0.55| 0.175|  4.819415|      2.154562|    1.03475675|    1.41

In [30]:
# Создаем модель
rf = RandomForestRegressor(featuresCol = "features", labelCol= "Age")
# Создаем конвейер
pipelineRF = Pipeline(stages = [indexer, assembler, rf])
# Обучение модели
modelRF = pipelineRF.fit(train)
# Предсказание возраста на тестовой выборке
predictionsRF = modelRF.transform(test).withColumnRenamed("prediction", "predictionsRF")
Pred_arr.append(predictionsRF)

# Сохранение степени влиятельности признаков
name = "RandomForestRegressor"
tup = (name, ) + tuple(list(map(lambda x: float(x), modelRF.stages[2].featureImportances)))
Importance_arr.append(tup)
print(Importance_arr)

[('RandomForestRegressor', 0.0285508694699434, 0.07806980473025281, 0.12269589066877688, 0.11721170686876277, 0.11111794711151542, 0.07203870266259102, 0.07159258239894649, 0.3987224960892113)]


In [31]:
# Оценка модели
evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsRF", metricName = "rmse")
rmseRF = evaluator.evaluate(predictionsRF)

evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsRF", metricName = "mae")
maeRF = evaluator.evaluate(predictionsRF)

tupleRMSE += (rmseRF,)

print("Root Mean Squared Error (RMSE): %s" % rmseRF)
print("Mean Absolute Error (MAE): %s" % maeRF)

# Вывод результата и выключение SparkSession
predictionsRF.select("Age", "predictionsRF").show()

Root Mean Squared Error (RMSE): 2.4055859200639436
Mean Absolute Error (MAE): 1.6750731467058526
+---+------------------+
|Age|     predictionsRF|
+---+------------------+
|  6| 7.241722028037499|
|  9| 7.839193129573229|
|  7| 7.538035960551551|
| 10| 7.644000578915727|
| 10|  8.22488891516051|
|  7|  8.58325901299813|
|  7| 8.169813220672534|
|  7| 9.099672171674749|
| 10| 8.965887203128904|
|  8| 8.501455029513455|
|  6|  8.74839500221802|
|  7|  9.13245205610956|
| 10|  8.94374136901161|
| 16| 9.269620211251105|
|  8| 9.232710814071345|
|  8| 8.984100407757913|
| 10| 9.133551594134392|
| 10| 9.669460585597665|
| 10| 9.442590537897093|
| 12|10.140361039849179|
+---+------------------+
only showing top 20 rows



In [32]:
# Создание модели градиентного бустинга
gbt = GBTRegressor(featuresCol="features", labelCol= "Age")
# Создание конвейера
pipelineGBT = Pipeline(stages = [indexer, assembler, gbt])
# Обучение модели
modelGBT = pipelineGBT.fit(train)
# Предсказание возраста на тестовой выборке
predictionsGBT = modelGBT.transform(test).withColumnRenamed("prediction", "predictionsGBT")
Pred_arr.append(predictionsGBT)

# Сохранение степени влиятельности признаков
name = "GBTRegressor"
tup = (name, ) + tuple(list(map(lambda x: float(x), modelGBT.stages[2].featureImportances)))
Importance_arr.append(tup)

In [33]:
# Оценка модели
evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsGBT", metricName = "rmse")
rmseGBT = evaluator.evaluate(predictionsGBT)

evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsGBT", metricName = "mae")
maeGBT = evaluator.evaluate(predictionsGBT)

tupleRMSE += (rmseGBT,)

print("Root Mean Squared Error (RMSE): %s" % rmseGBT)
print("Mean Absolute Error (MAE): %s" % maeGBT)

# Вывод результата и выключение SparkSession
predictionsGBT.select("Age", "predictionsGBT").show()

Root Mean Squared Error (RMSE): 2.3806134405952695
Mean Absolute Error (MAE): 1.64338887523341
+---+------------------+
|Age|    predictionsGBT|
+---+------------------+
|  6| 5.944553599927392|
|  9|  8.12966426391129|
|  7|  8.12966426391129|
| 10|  8.12966426391129|
| 10| 8.338524885519957|
|  7| 9.757891458403545|
|  7| 9.475181375275692|
|  7| 9.966752080012213|
| 10| 9.966752080012213|
|  8| 9.757891458403545|
|  6| 9.757891458403545|
|  7| 9.924759122898589|
| 10| 9.641815032377098|
| 16| 9.924759122898589|
|  8| 9.924759122898589|
|  8| 9.966752080012213|
| 10|   9.8102361436508|
| 10| 9.638052696088856|
| 10|  9.80996429522989|
| 12|11.308200454854616|
+---+------------------+
only showing top 20 rows



In [34]:
# Создание модели линейной регрессии
lr = LinearRegression(featuresCol="features", labelCol="Age", maxIter=10, regParam=0.3, elasticNetParam=0.8) 
# Создание конвейера
pipelineLR = Pipeline(stages = [indexer, assembler, lr])
# Fit the model
modelLR = pipelineLR.fit(train)
# Предсказание возраста на тестовой выборке
predictionsLR = modelLR.transform(test).withColumnRenamed("prediction", "predictionsLR")
Pred_arr.append(predictionsLR)

In [35]:
# Оценка модели
evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsLR", metricName = "rmse")
rmseLR = evaluator.evaluate(predictionsLR)

evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsLR", metricName = "mae")
maeLR = evaluator.evaluate(predictionsLR)

tupleRMSE += (rmseLR,)

print("Root Mean Squared Error (RMSE): %s" % rmseLR)
print("Mean Absolute Error (MAE): %s" % maeLR)

# Вывод результата и выключение SparkSession
predictionsLR.select("Age", "predictionsLR").show()

Root Mean Squared Error (RMSE): 2.635677721670266
Mean Absolute Error (MAE): 1.8818309318367363
+---+-----------------+
|Age|    predictionsLR|
+---+-----------------+
|  6|7.575476249780159|
|  9|7.851307570196404|
|  7|7.797595393818223|
| 10|7.931763188864219|
| 10|8.062339587924274|
|  7|8.094281218294956|
|  7|7.946183684149375|
|  7|8.349116073736642|
| 10|8.136044839266935|
|  8|8.133202226477545|
|  6|8.060674762780712|
|  7|8.288653530033308|
| 10| 8.20202192960114|
| 16|8.484807031502516|
|  8|8.320220768805715|
|  8| 8.32777453217842|
| 10| 8.49213178438847|
| 10|8.635834526500556|
| 10|8.853903955088786|
| 12|8.859102143169835|
+---+-----------------+
only showing top 20 rows



In [36]:
# Создание модели решающих деервьев
dt = DecisionTreeRegressor(featuresCol="features", labelCol="Age")
# Создание конвейера
pipelineDT = Pipeline(stages = [indexer, assembler, dt])
# Fit the model
modelDT = pipelineDT.fit(train)
# Предсказание возраста на тестовой выборке
predictionsDT = modelDT.transform(test).withColumnRenamed("prediction", "predictionsDT")
Pred_arr.append(predictionsDT)

# Сохранение степени влиятельности признаков
name = "DecisionTreeRegressor"
tup = (name, ) + tuple(list(map(lambda x: float(x), modelDT.stages[2].featureImportances)))
Importance_arr.append(tup)

In [37]:
# Оценка модели
evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsDT", metricName = "rmse")
rmseDT = evaluator.evaluate(predictionsDT)

evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsDT", metricName = "mae")
maeDT = evaluator.evaluate(predictionsDT)

tupleRMSE += (rmseDT,)

print("Root Mean Squared Error (RMSE): %s" % rmseDT)
print("Mean Absolute Error (MAE): %s" % maeDT)

# Вывод результата и выключение SparkSession
predictionsDT.select("Age", "predictionsDT").show()

Root Mean Squared Error (RMSE): 2.4442793759368713
Mean Absolute Error (MAE): 1.6966915215364518
+---+-----------------+
|Age|    predictionsDT|
+---+-----------------+
|  6|5.957746478873239|
|  9|8.142857142857142|
|  7|8.142857142857142|
| 10|8.142857142857142|
| 10|8.142857142857142|
|  7|9.771084337349398|
|  7|9.771084337349398|
|  7|9.771084337349398|
| 10|9.771084337349398|
|  8|9.771084337349398|
|  6|9.771084337349398|
|  7|9.771084337349398|
| 10|9.771084337349398|
| 16|9.771084337349398|
|  8|9.771084337349398|
|  8|9.771084337349398|
| 10|9.771084337349398|
| 10|9.771084337349398|
| 10|9.771084337349398|
| 12|9.771084337349398|
+---+-----------------+
only showing top 20 rows



In [38]:
# Создание модели изотонической регрессии
ir = IsotonicRegression(featuresCol="features", labelCol="Age")
# Создание конвейера
pipelineIR = Pipeline(stages = [indexer, assembler, ir])
# Fit the model
modelIR = pipelineIR.fit(train)
# Предсказание возраста на тестовой выборке
predictionsIR = modelIR.transform(test).withColumnRenamed("prediction", "predictionsIR")
Pred_arr.append(predictionsIR)

In [39]:
# Оценка модели
evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsIR", metricName = "rmse")
rmseIR = evaluator.evaluate(predictionsIR)

evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsIR", metricName = "mae")
maeIR = evaluator.evaluate(predictionsIR)

tupleRMSE += (rmseIR,)

print("Root Mean Squared Error (RMSE): %s" % rmseIR)
print("Mean Absolute Error (MAE): %s" % maeIR)

# Вывод результата и выключение SparkSession
predictionsIR.select("Age", "predictionsIR").show()

Root Mean Squared Error (RMSE): 4.055998996778264
Mean Absolute Error (MAE): 3.1458471145125584
+---+------------------+
|Age|     predictionsIR|
+---+------------------+
|  6|10.088335773548073|
|  9|10.088335773548073|
|  7|10.088335773548073|
| 10|10.088335773548073|
| 10|10.088335773548073|
|  7|10.088335773548073|
|  7|10.088335773548073|
|  7|10.088335773548073|
| 10|10.088335773548073|
|  8|10.088335773548073|
|  6|10.088335773548073|
|  7|10.088335773548073|
| 10|10.088335773548073|
| 16|10.088335773548073|
|  8|10.088335773548073|
|  8|10.088335773548073|
| 10|10.088335773548073|
| 10|10.088335773548073|
| 10|10.088335773548073|
| 12|10.088335773548073|
+---+------------------+
only showing top 20 rows



In [40]:
# Создание модели FMRegressor
fm = FMRegressor(featuresCol="features", labelCol="Age", predictionCol="prediction",stepSize=0.001)
# Создание конвейера
pipelineFM = Pipeline(stages = [indexer, assembler, fm])
# Fit the model
modelFM = pipelineFM.fit(train)
# Предсказание возраста на тестовой выборке
predictionsFM = modelFM.transform(test).withColumnRenamed("prediction", "predictionsFM")
Pred_arr.append(predictionsFM)

In [41]:
# Оценка модели
evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsFM", metricName = "rmse")
rmseFM = evaluator.evaluate(predictionsFM)

evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsFM", metricName = "mae")
maeFM = evaluator.evaluate(predictionsFM)

tupleRMSE += (rmseFM,)

print("Root Mean Squared Error (RMSE): %s" % rmseFM)
print("Mean Absolute Error (MAE): %s" % maeFM)

# Вывод результата и выключение SparkSession
predictionsFM.select("Age", "predictionsFM").show()

Root Mean Squared Error (RMSE): 5.534555706146596
Mean Absolute Error (MAE): 4.596309743749149
+---+------------------+
|Age|     predictionsFM|
+---+------------------+
|  6|1.1272071341747298|
|  9| 1.304731712057396|
|  7|1.2561276148892677|
| 10|1.4285060228178794|
| 10|1.5281194253477555|
|  7| 1.756910301076485|
|  7|1.8887283731847917|
|  7|1.9656042788540469|
| 10|1.6736157591738352|
|  8|1.8402247542358272|
|  6| 1.759240984914093|
|  7| 2.147802636499514|
| 10|1.9907166557882068|
| 16|2.3115656055727194|
|  8| 2.342130291686629|
|  8| 2.054417119149027|
| 10|2.4380034836363196|
| 10|2.4953305640829058|
| 10|3.1244903517065974|
| 12|2.5907763971846465|
+---+------------------+
only showing top 20 rows



In [42]:
# Модель  `AFTSurvivalRegression`  не подходит для датасета  `CrabAgePrediction`, так как нет информации о том,  
# живы ли были крабы, когда их поймали.

# Эта модель предназначена для анализа данных выживания, где есть данные о времени до события (в данном случае, смерть) 
# и информация о том,  было ли это событие наблюдено (краб умер) или  **цензурировано**  (краб был жив, когда его поймали).  

In [43]:
glr = GeneralizedLinearRegression(featuresCol="features", labelCol="Age", family="gaussian", link="identity", maxIter=10, regParam=0.3)
# Создание конвейера
pipelineGLR = Pipeline(stages = [indexer, assembler, glr])
# Fit the model
modelGLR = pipelineGLR.fit(train)
# Предсказание возраста на тестовой выборке
predictionsGLR = modelGLR.transform(test).withColumnRenamed("prediction", "predictionsGLR")
Pred_arr.append(predictionsGLR)

In [44]:
# Оценка модели
evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsGLR", metricName = "rmse")
rmseGLR = evaluator.evaluate(predictionsGLR)

evaluator = RegressionEvaluator(labelCol = "Age", predictionCol = "predictionsGLR", metricName = "mae")
maeGLR = evaluator.evaluate(predictionsGLR)

tupleRMSE += (rmseGLR,)

print("Root Mean Squared Error (RMSE): %s" % rmseGLR)
print("Mean Absolute Error (MAE): %s" % maeGLR)

# Вывод результата и выключение SparkSession
predictionsGLR.select("Age", "predictionsGLR").show()

Root Mean Squared Error (RMSE): 2.440465922240704
Mean Absolute Error (MAE): 1.7213759461682905
+---+-----------------+
|Age|   predictionsGLR|
+---+-----------------+
|  6|6.986039261259186|
|  9|7.676532516111662|
|  7|7.627029507818958|
| 10|7.680174900180644|
| 10| 8.02858283303358|
|  7|7.990860558877739|
|  7|7.583519416231743|
|  7|8.276963375088364|
| 10|8.158501597096183|
|  8|8.050373152874357|
|  6|8.058047779509558|
|  7|8.219432211224959|
| 10|8.144933668758657|
| 16|8.633411457479916|
|  8|8.377993945207525|
|  8| 8.49611827394601|
| 10|8.495546339134307|
| 10|8.931110922961462|
| 10|8.890694834372836|
| 12|9.135689970436971|
+---+-----------------+
only showing top 20 rows



In [45]:
# Создаем списки моделей, выполняющихся том же в порядке выше
modelsList = ["RandomForestRegressor", "GBTRegressor", "LinearRegression", "DecisionTreeRegressor", 
              "IsotonicRegression", "FMRegressor", "GeneralizedLinearRegression"]
shortModelsList = ["predictionsRF", "predictionsGBT", "predictionsLR", "predictionsDT", "predictionsIR", 
                   "predictionsFM", "predictionsGLR"]

# Схема нового DataFrame значений RMSE каждой модели
schemaRMSE = StructType([
    StructField(modelsList[0], DoubleType(), True),
    StructField(modelsList[1], DoubleType(), True),
    StructField(modelsList[2], DoubleType(), True),
    StructField(modelsList[3], DoubleType(), True),
    StructField(modelsList[4], DoubleType(), True),
    StructField(modelsList[5], DoubleType(), True),
    StructField(modelsList[6], DoubleType(), True)
])
# Создание DataFrame
rmseDF = spark.createDataFrame([tupleRMSE], schemaRMSE)

In [46]:
# Формирование схемы Dataframe влиятельных признаков
types = [StructField("Model_name", StringType(), True)]

for i in feature_names:
    types.append(StructField(i, DoubleType(), True))

schema_imp = StructType(types)
impDF = spark.createDataFrame(Importance_arr, schema_imp)

In [47]:
# Сбор и структурирование данных о результатах предсказаний
# Добавляем столбец ID, чтобы избежать потери данных при объединении
for i, df in enumerate(Pred_arr):
    df = df.withColumn("id", monotonically_increasing_id())
    Pred_arr[i] = df

# Отбираем данных предсказаний
ListSelectedDF = [df.select("Age", "id", shortModelsList[i]) for i, df in enumerate(Pred_arr)]

# формируем из них новый Dataframe по Age и id
predDF = ListSelectedDF[0]

for i in range(1, len(ListSelectedDF)):
    predDF = predDF.join(ListSelectedDF[i], on = ["Age", "id"], how = "inner")

predDF = predDF.drop("id")

In [48]:
# Вывод лучшего результата
print(f"Лучшая модель: {modelsList[rmseDF.head().index(min(rmseDF.head()))]} (RMSE: {min(rmseDF.head())})")
# Запись результатов в json-файл
rmseDF.coalesce(1).write.json("RMSE.json", mode = 'overwrite')
predDF.write.json("Pred.json", mode = 'overwrite')
impDF.coalesce(1).write.json("Importance.json", mode = 'overwrite')

                                                                                

Лучшая модель: GBTRegressor (RMSE: 2.3806134405952695)


                                                                                

In [49]:
spark.stop()