In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

In [3]:
sc = SparkSession.builder.master('local[*]').getOrCreate()

24/02/14 12:23:13 WARN Utils: Your hostname, linux3 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/02/14 12:23:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/14 12:23:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df_carro = sc.read.csv('data.csv', inferSchema=True, header=True)
df_carro.show()

                                                                                

+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

In [5]:
#checando schema
df_carro.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [6]:
#checanndo estatisticas
df_carro.describe().toPandas().transpose()

24/02/14 12:23:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [7]:
#checando valores nulos
def replace(column, value):
  return when(column != value, column).otherwise(lit(None))

# Na coluna 'Categoria de Mercado', substituindo todas as ocorrências do valor de string 'N/A' por 'None'.
df_carro = df_carro.withColumn('Market Category', replace(col('Market Category'), 'N/A'))
df_carro.show()

+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

In [8]:
#Contando numeros de nulos
df_carro.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_carro.columns]).show()

[Stage 7:>                                                          (0 + 1) / 1]

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



                                                                                

In [9]:
# Dropando coluna com mais de 30% de nulos
df_carro = df_carro.drop('Market Category')

In [10]:
#dropando nullos
df_carro = df_carro.na.drop()

# Criando Random Forest Pipline

In [11]:
# Colocando todas as colunas numéricas em um vetor de recursos
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', 'Number of Doors', 'highway MPG', 'city mpg', 'Popularity'],
                            outputCol='Attributes')

#criando um regressor para prever o preço do carro
regressor = RandomForestRegressor(featuresCol='Attributes',
                                  labelCol='MSRP')

#criando pipeline
pipeline = Pipeline(stages=[assembler, regressor])

#save pipeline
pipeline.write().overwrite().save('pipeline')

## Criando uma Crossvalidation por Hyperparameter Tuning

In [12]:
# Carregando pipeline
pipelineModel = Pipeline.load('pipeline')

                                                                                

In [13]:
#Montando paramgrid
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees, [100, 500]).build()

In [14]:
#Montando crossvalidator
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='MSRP'), #MSRP é a coluna que queremos prever
                          numFolds=10)

# Train & Test Model

In [15]:
#train test split
train_data, test_data = df_carro.randomSplit([0.8, 0.2], seed=123)

In [16]:
# Treinando
cvModel = crossval.fit(train_data)

24/02/14 12:24:06 WARN DAGScheduler: Broadcasting large task binary with size 1172.5 KiB
24/02/14 12:24:10 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/02/14 12:24:26 WARN DAGScheduler: Broadcasting large task binary with size 1175.9 KiB
24/02/14 12:24:29 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/02/14 12:24:46 WARN DAGScheduler: Broadcasting large task binary with size 1195.3 KiB
24/02/14 12:24:49 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/02/14 12:25:07 WARN DAGScheduler: Broadcasting large task binary with size 1170.1 KiB
24/02/14 12:25:10 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/02/14 12:25:23 WARN DAGScheduler: Broadcasting large task binary with size 1171.8 KiB
24/02/14 12:25:26 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/02/14 12:25:41 WARN DAGScheduler: Broadcasting large task binary with size 1170.1 KiB
24/02/14 12:25:46 WARN DAGScheduler:

In [17]:
# extrair o melhor modelo e visualizar todas as etapas do pipeline pelas quais nossos dados passaram
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_9bf212e3dca6
RandomForestRegressionModel: uid=RandomForestRegressor_2143dd388910, numTrees=500, numFeatures=7


In [19]:
bestModel.write().overwrite().save('pipeline_v2')

24/02/14 12:28:05 WARN TaskSetManager: Stage 339 contains a task of very large size (1037 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [17]:
# transformar o conjunto de testes (use cvModel, pois ele sabe escolher o melhor modelo para usar)
pred = cvModel.transform(test_data)
pred.select('MSRP', 'prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|28030| 32016.03932645664|
|30550|37899.239812205065|
|29350|27906.596270755555|
|27900| 27369.36291587037|
|34890| 27369.36291587037|
|32990| 27369.36291587037|
| 2827| 5627.286424319113|
| 3000| 5627.286424319113|
| 3086| 5635.780120477701|
| 3130| 5635.780120477701|
| 3012| 5656.262046940777|
| 3622| 6208.489239694285|
|22300| 23943.15199500068|
|19400|22244.705742427675|
| 2042| 5307.476418344488|
| 2144| 5516.475708777125|
|49440|39524.307877247855|
|52640|39524.307877247855|
|47440|39834.507460278684|
|58400| 39822.78560685892|
+-----+------------------+
only showing top 20 rows



In [1]:
pred.select('MSRP', 'prediction').show()

NameError: name 'pred' is not defined

# Evaluate Model

In [18]:
#evaluate
eval = RegressionEvaluator(labelCol='MSRP')

#get rmse
rmse = eval.evaluate(pred)

#get mse
mse = eval.evaluate(pred, {eval.metricName:'mse'})

#get mae
mae = eval.evaluate(pred, {eval.metricName:'mae'})

#get r2
r2 = eval.evaluate(pred, {eval.metricName:'r2'})

#print
print('RMSE: %3f' %rmse)
print('MSE: %3f' %mse)
print('MAE: %3f' %mae)
print('R2: %3f' %r2)

                                                                                

RMSE: 36696.172168
MSE: 1346609051.790076
MAE: 9768.361122
R2: 0.773532
