In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd

spark = SparkSession.builder.master("loc[*]").getOrCreate()

In [2]:
vehicles_data = spark.read.csv(
    'course_workF.csv', header=True, sep=";")

In [17]:
type(vehicles_data)

pyspark.sql.dataframe.DataFrame

In [18]:
vehicles_data.printSchema()

root
 |-- id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- region: string (nullable = true)
 |-- state: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- type: string (nullable = true)
 |-- price: string (nullable = true)



In [3]:
from pyspark.sql.types import IntegerType, FloatType,DoubleType
vehicles_data = vehicles_data.withColumn("id", vehicles_data["id"].cast(IntegerType()))
vehicles_data = vehicles_data.withColumn("year", vehicles_data["year"].cast(IntegerType()))
vehicles_data = vehicles_data.withColumn("cylinders", vehicles_data["cylinders"].cast(IntegerType()))

vehicles_data = vehicles_data.withColumn("odometer", vehicles_data["odometer"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("price", vehicles_data["price"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("condition", vehicles_data["condition"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("drive", vehicles_data["drive"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("fuel", vehicles_data["fuel"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("manufacturer", vehicles_data["manufacturer"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("model", vehicles_data["model"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("region", vehicles_data["region"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("state", vehicles_data["state"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("title_status", vehicles_data["title_status"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("transmission", vehicles_data["transmission"].cast(FloatType()))
vehicles_data = vehicles_data.withColumn("type", vehicles_data["type"].cast(FloatType()))

In [4]:
from pyspark.ml.feature import VectorAssembler
features = [
            'year',
            'cylinders',
            'odometer',
            'condition', 
            'drive',
            'fuel',
            'manufacturer',
            'model',
            'region',
            'state',
            'title_status',
            'transmission',
            'type'
           ]

assembler = VectorAssembler(inputCols=features,
                            outputCol='features')
output = assembler.transform(vehicles_data)

In [5]:
train, test = output.randomSplit([0.8, 0.2])

In [12]:
%%time
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


lr = LinearRegression(featuresCol='features',
                        labelCol='price')

model = lr.fit(train)

predictions = model.transform(test)
trainingSummary = model.summary

print(f'Точність: {trainingSummary.r2*100} %')

Точність: 62.07112610765071 %
Wall time: 2.78 s


In [7]:
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

RMSE: 7533.829675


In [23]:
predictions.limit(5).select(col("price"),col("prediction")).show()

+-------+------------------+
|  price|        prediction|
+-------+------------------+
|35990.0|28973.721020836383|
|39990.0|   30997.392699313|
|41990.0|32210.276041734265|
| 4900.0|   8542.4719476779|
|11500.0|11078.824437043513|
+-------+------------------+



In [14]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", featuresCol="features")
model2 = rf.fit(train)
predictions2 = model2.transform(test)

evaluator2 = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="r2")
r2  = evaluator2.evaluate(predictions2)
print(f'Точність: {r2*100} %')

Точність: 80.7851203556598 %


In [21]:
evaluator2 = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse  = evaluator2.evaluate(predictions2)
print(f'RMSE: {rmse}')

RMSE: 5401.565065347948


In [19]:
predictions2.limit(5).select(col("price"),col("prediction")).show()

+-------+------------------+
|  price|        prediction|
+-------+------------------+
|35990.0|37187.006332916426|
|39990.0| 36372.10273303191|
|41990.0|  38193.8368730263|
| 4900.0| 7458.004411965395|
|11500.0|11904.050342544091|
+-------+------------------+



In [15]:
from pyspark.ml.regression import GBTRegressor
gf = GBTRegressor(labelCol="price", featuresCol="features")
model3 = gf.fit(train)
predictions3 = model3.transform(test)

evaluator3 = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="r2")
r2  = evaluator3.evaluate(predictions3)
print(f'Точность: {r2*100} %')

Точность: 84.99360403959551 %


In [22]:
evaluator3 = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse  = evaluator3.evaluate(predictions3)
print(f'RMSE: {rmse} ')

RMSE: 4773.522548377432 


In [20]:
predictions3.limit(5).select(col("price"),col("prediction")).show()

+-------+------------------+
|  price|        prediction|
+-------+------------------+
|35990.0| 38737.00739226091|
|39990.0| 40067.69940295504|
|41990.0| 40067.69940295504|
| 4900.0| 6275.498965719492|
|11500.0|13288.849247598364|
+-------+------------------+

