In [1]:
import matplotlib.pyplot as plt
import numpy as np 
import pandas as pd 
import seaborn as sns
import plotly.express as px

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import datediff
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import SparkContext

In [3]:
spark = SparkSession.builder \
    .appName("ML_model") \
    .getOrCreate()

In [207]:
#Loading the prepared dataset with a confidence interval
sparkDF = spark.read.csv("punks_lim.csv",header=True,inferSchema=True)

In [208]:
sparkDF.show()

+---+----------+----+--------+-----+-----+-----------------+
|_c0|N_elements|Rank|   Score|Items|Price|  close_price_eth|
+---+----------+----+--------+-----+-----+-----------------+
|  5|       618|4632| 62.3671|    2|24.99| 1626.57568359375|
|  8|      1186|6436|  54.763|    2| 48.0|3013.732666015625|
| 10|      1316|7545| 50.9701|    2|14.66| 1935.60107421875|
| 11|      1316|7545| 50.9701|    2| 10.5| 1935.60107421875|
| 12|      1316|7545| 50.9701|    2|34.45|2724.619873046875|
| 15|      1633|9464|  38.252|    2| 67.8| 3319.25732421875|
| 19|      1711| 451|186.7855|    2|72.69| 3319.25732421875|
| 20|      1711| 451|186.7855|    2|44.99|2827.328857421875|
| 22|      2036|9464|  38.252|    2|19.49|  2316.0595703125|
| 23|      2036|9464|  38.252|    2|17.99|  3785.8486328125|
| 25|      2036|9464|  38.252|    2| 5.55|359.9378662109375|
| 26|      2036|9464|  38.252|    2|  5.0|359.9378662109375|
| 28|      2572|7072|  52.494|    2|47.99|2139.353271484375|
| 29|      2572|7072|  5

In [209]:
#Dataset schema
sparkDF.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- N_elements: integer (nullable = true)
 |-- Rank: integer (nullable = true)
 |-- Score: double (nullable = true)
 |-- Items: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- close_price_eth: double (nullable = true)



In [251]:
train, test = sparkDF.randomSplit([0.8, 0.2], seed=42)

In [252]:
#Adding columns to our model
featureCols = ["Score","close_price_eth","Rank",'Items']

In [253]:
# put features into a feature vector column
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

In [254]:
#transform our data
train = assembler.transform(train)

In [255]:
test = assembler.transform(test)

In [256]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_new")

In [257]:
# Fit the DataFrame to the scaler
scaled = standardScaler.fit(train)

In [258]:
train = scaled.transform(train)
test = scaled.transform(test)

In [259]:
# Initialize `lr`
lr = (LinearRegression(featuresCol='features_new', labelCol="Price", predictionCol='pred', 
                               maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False))

In [260]:
# Fit the data to the model
linearModel = lr.fit(train)

In [261]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.2625, 14.4532, -1.151, 0.0])

In [262]:
# Intercept for the model
linearModel.intercept

1.1347672754997922

In [263]:
# Generate predictions
predictions = linearModel.transform(test)

In [264]:
# Extract the predictions and the "known" correct labels
predn = predictions.select("pred", "Price").show()

+-------------------+-----+
|               pred|Price|
+-------------------+-----+
| 45.993956837071345|72.69|
|   28.5264792902446|19.49|
| 42.460501178711525| 85.0|
| 51.477009170648884|  0.0|
|  44.15165522331917| 67.8|
|  2.134810864900796|  7.5|
|0.35939363256693224| 0.59|
|  29.64820374848746|19.99|
| 23.710625972113995|16.68|
|  33.57218871446032|39.99|
| 18.811188741685413| 20.0|
|  2.924864867465799|  2.3|
| 2.4400713208942104| 0.95|
|  32.79745087562159|  0.0|
|  39.74633822245513| 80.0|
| 29.582520287052642| 33.0|
| 31.853140764513036|  0.0|
|  44.53605734025197| 59.0|
| 5.2192446734355356| 4.99|
|  51.68641987858546| 20.0|
+-------------------+-----+
only showing top 20 rows



In [265]:
test_results = linearModel.evaluate(test)

In [266]:
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|  26.696043162928653|
|    -9.0364792902446|
|  42.539498821288475|
| -51.477009170648884|
|   23.64834477668083|
|   5.365189135099204|
| 0.23060636743306773|
|  -9.658203748487463|
|  -7.030625972113995|
|   6.417811285539685|
|   1.188811258314587|
| -0.6248648674657993|
| -1.4900713208942105|
|  -32.79745087562159|
|   40.25366177754487|
|   3.417479712947358|
| -31.853140764513036|
|  14.463942659748028|
|-0.22924467343553534|
|  -31.68641987858546|
+--------------------+
only showing top 20 rows



In [267]:
#We look at the errors
print("RMSE: {}".format(test_results.rootMeanSquaredError))

RMSE: 14.252562145421047


In [268]:
data = test.select('features_new')

In [269]:
predictions = linearModel.transform(data)

In [270]:
predictions.show()

+--------------------+-------------------+
|        features_new|               pred|
+--------------------+-------------------+
|[0.48220321270508...| 45.993956837071345|
|[0.09875090567734...|   28.5264792902446|
|[0.13551788253232...| 42.460501178711525|
|[0.16411387508427...| 51.477009170648884|
|[0.16100615940262...|  44.15165522331917|
|[0.11826280623678...|  2.134810864900796|
|[0.14171679173216...|0.35939363256693224|
|[0.14171679173216...|  29.64820374848746|
|[0.12351375630861...| 23.710625972113995|
|[0.26269491226863...|  33.57218871446032|
|[0.16627776217877...| 18.811188741685413|
|[0.13551788253232...|  2.924864867465799|
|[0.18406206402235...| 2.4400713208942104|
|[0.16352346589969...|  32.79745087562159|
|[0.15313463931215...|  39.74633822245513|
|[0.14137550579337...| 29.582520287052642|
|[0.14137550579337...| 31.853140764513036|
|[0.16455016346290...|  44.53605734025197|
|[0.21126735509756...| 5.2192446734355356|
|[0.13551788253232...|  51.68641987858546|
+----------

In [271]:
#We look at the errors
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("R2: {}".format(test_results.r2))

RMSE: 14.252562145421047
MSE: 203.13552770908902
R2: 0.5247250897937524


In [272]:
#Second Model

In [273]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor

In [274]:
#тренируем модель (градиентного регрессионого дерева бустинга)
gbt = GBTRegressor(labelCol="Price", featuresCol="features_new", maxIter=10)

In [275]:
train.show()

+---+----------+----+--------+-----+-----+-----------------+--------------------+--------------------+
|_c0|N_elements|Rank|   Score|Items|Price|  close_price_eth|            features|        features_new|
+---+----------+----+--------+-----+-----+-----------------+--------------------+--------------------+
|  5|       618|4632| 62.3671|    2|24.99| 1626.57568359375|[62.3671,1626.575...|[0.16100615940262...|
|  8|      1186|6436|  54.763|    2| 48.0|3013.732666015625|[54.763,3013.7326...|[0.14137550579337...|
| 10|      1316|7545| 50.9701|    2|14.66| 1935.60107421875|[50.9701,1935.601...|[0.13158380051930...|
| 11|      1316|7545| 50.9701|    2| 10.5| 1935.60107421875|[50.9701,1935.601...|[0.13158380051930...|
| 12|      1316|7545| 50.9701|    2|34.45|2724.619873046875|[50.9701,2724.619...|[0.13158380051930...|
| 15|      1633|9464|  38.252|    2| 67.8| 3319.25732421875|[38.252,3319.2573...|[0.09875090567734...|
| 20|      1711| 451|186.7855|    2|44.99|2827.328857421875|[186.7855,282

In [276]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[gbt])
model = pipeline.fit(train)

In [277]:
# делаем предсказания на тестовой выборке
predictions = model.transform(test)

In [278]:
# Select example rows to display.
predictions.select("features_new","Price", "prediction").show(5)

+--------------------+-----+------------------+
|        features_new|Price|        prediction|
+--------------------+-----+------------------+
|[0.48220321270508...|72.69| 59.94731437448423|
|[0.09875090567734...|19.49| 25.13716358308544|
|[0.13551788253232...| 85.0| 60.87107290868313|
|[0.16411387508427...|  0.0|24.540627568311578|
|[0.16100615940262...| 67.8| 63.16069178322286|
+--------------------+-----+------------------+
only showing top 5 rows



In [279]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 12.5857


In [280]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("Root Mean Squared Error (R2) on test data = %g" % r2)

Root Mean Squared Error (R2) on test data = 0.629395


In [281]:
#Third model

In [282]:
# Random Forest Regressor
from pyspark.ml.regression import RandomForestRegressor

# Define Random Forest Regressor algorithm
rf = RandomForestRegressor(labelCol="Price", featuresCol="features_new",numTrees=2, maxDepth=2, seed=42) # featuresCol="indexedFeatures",numTrees=2, maxDepth=2, seed=42

In [283]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(train)

In [284]:
# делаем предсказания на тестовой выборке
predictions = model.transform(test)

In [285]:
# Select example rows to display.
predictions.select("features_new","Price", "prediction").show(5)

+--------------------+-----+------------------+
|        features_new|Price|        prediction|
+--------------------+-----+------------------+
|[0.48220321270508...|72.69|35.331252106354555|
|[0.09875090567734...|19.49|26.188239104393766|
|[0.13551788253232...| 85.0| 36.53444372562565|
|[0.16411387508427...|  0.0|  32.7486004971095|
|[0.16100615940262...| 67.8|  32.7486004971095|
+--------------------+-----+------------------+
only showing top 5 rows



In [286]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 15.04


In [287]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("Root Mean Squared Error (R2) on test data = %g" % r2)

Root Mean Squared Error (R2) on test data = 0.470755
