In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Automobile').getOrCreate()

21/12/05 15:16:01 WARN Utils: Your hostname, carlos-myspark resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
21/12/05 15:16:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/05 15:16:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/05 15:16:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
data = spark.read.csv('Automobile_data.csv', inferSchema=True, header=True)

                                                                                

In [4]:
import pandas as pd

In [5]:
data.printSchema()

root
 |-- symboling: integer (nullable = true)
 |-- normalized-losses: string (nullable = true)
 |-- make: string (nullable = true)
 |-- fuel-type: string (nullable = true)
 |-- aspiration: string (nullable = true)
 |-- num-of-doors: string (nullable = true)
 |-- body-style: string (nullable = true)
 |-- drive-wheels: string (nullable = true)
 |-- engine-location: string (nullable = true)
 |-- wheel-base: double (nullable = true)
 |-- length: double (nullable = true)
 |-- width: double (nullable = true)
 |-- height: double (nullable = true)
 |-- curb-weight: integer (nullable = true)
 |-- engine-type: string (nullable = true)
 |-- num-of-cylinders: string (nullable = true)
 |-- engine-size: integer (nullable = true)
 |-- fuel-system: string (nullable = true)
 |-- bore: string (nullable = true)
 |-- stroke: string (nullable = true)
 |-- compression-ratio: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- peak-rpm: string (nullable = true)
 |-- city-mpg: integer (nu

In [6]:
pd.DataFrame(data.take(5), columns=data.columns).transpose()

21/12/05 15:16:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,0,1,2,3,4
symboling,3,3,1,2,2
normalized-losses,?,?,?,164,164
make,alfa-romero,alfa-romero,alfa-romero,audi,audi
fuel-type,gas,gas,gas,gas,gas
aspiration,std,std,std,std,std
num-of-doors,two,two,two,four,four
body-style,convertible,convertible,hatchback,sedan,sedan
drive-wheels,rwd,rwd,rwd,fwd,4wd
engine-location,front,front,front,front,front
wheel-base,88.6,88.6,94.5,99.8,99.4


In [7]:
data.describe()

                                                                                

DataFrame[summary: string, symboling: string, normalized-losses: string, make: string, fuel-type: string, aspiration: string, num-of-doors: string, body-style: string, drive-wheels: string, engine-location: string, wheel-base: string, length: string, width: string, height: string, curb-weight: string, engine-type: string, num-of-cylinders: string, engine-size: string, fuel-system: string, bore: string, stroke: string, compression-ratio: string, horsepower: string, peak-rpm: string, city-mpg: string, highway-mpg: string, price: string]

In [8]:
data.columns

['symboling',
 'normalized-losses',
 'make',
 'fuel-type',
 'aspiration',
 'num-of-doors',
 'body-style',
 'drive-wheels',
 'engine-location',
 'wheel-base',
 'length',
 'width',
 'height',
 'curb-weight',
 'engine-type',
 'num-of-cylinders',
 'engine-size',
 'fuel-system',
 'bore',
 'stroke',
 'compression-ratio',
 'horsepower',
 'peak-rpm',
 'city-mpg',
 'highway-mpg',
 'price']

In [9]:
numeric_features = [t[0] for t in data.dtypes if t[1] == 'int' or t[1] == 'double']
data.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
symboling,205,0.8341463414634146,1.2453068281055295,-2,3
wheel-base,205,98.75658536585378,6.0217756850255695,86.6,120.9
length,205,174.04926829268305,12.337288526555188,141.1,208.1
width,205,65.90780487804875,2.145203852687182,60.3,72.3
height,205,53.724878048780525,2.443521969904905,47.8,59.8
curb-weight,205,2555.5658536585365,520.6802035016384,1488,4066
engine-size,205,126.90731707317073,41.642693438179855,61,326
compression-ratio,205,10.142536585365855,3.9720403218632985,7.0,23.0
city-mpg,205,25.21951219512195,6.54214165300162,13,49


In [37]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
data = data.withColumn('price', F.col('price').cast(IntegerType()))

In [38]:
my_cols = data.select(['symboling', 'make', 'price'])

In [40]:
my_final_data = my_cols.na.drop()

In [41]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                               OneHotEncoder, StringIndexer)

In [42]:
make_indexer = StringIndexer(inputCol='make', outputCol='MakeIndex')
make_encoder = OneHotEncoder(inputCol='MakeIndex', outputCol='MakeVec')

In [43]:
assembler = VectorAssembler(inputCols=['symboling',
                                      'MakeVec',
                                      'price'], outputCol='features')

In [44]:
from pyspark.ml import Pipeline

In [45]:
from pyspark.ml.regression import RandomForestRegressor

In [46]:
rf = RandomForestRegressor(featuresCol='features', labelCol='symboling')

In [47]:
pipeline = Pipeline(stages=[make_indexer, 
                            make_encoder,
                            assembler, rf])

In [48]:
train_data_auto, test_data_auto = my_final_data.randomSplit([0.7, 0.3])

In [49]:
fit_model = pipeline.fit(train_data_auto)

In [50]:
result = fit_model.transform(test_data_auto)

In [51]:
result.select('features', 'prediction').show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|(22,[0,8,21],[-2....| -1.3376688126688125|
|(22,[0,11,21],[-1...|-0.07989293773192079|
|(22,[0,14,21],[-1...| -0.3953612828817746|
|(22,[0,14,21],[-1...| -0.3953612828817746|
|(22,[0,1,21],[-1....|-0.42797631750382514|
|(22,[0,8,21],[-1....| -0.9876688126688127|
|(22,[0,8,21],[-1....| -0.9876688126688127|
|(22,[10,21],[1.0,...| 0.08575719319183599|
|(22,[6,21],[1.0,7...| 0.12404751112656087|
|(22,[6,21],[1.0,7...| 0.12404751112656087|
|(22,[20,21],[1.0,...| 0.08575719319183599|
|(22,[20,21],[1.0,...| 0.08575719319183599|
|(22,[2,21],[1.0,1...| 0.11324993193457474|
|(22,[14,21],[1.0,...|0.020717148490774374|
|(22,[3,21],[1.0,1...|   0.165731716998327|
|(22,[3,21],[1.0,1...|   0.165731716998327|
|(22,[12,21],[1.0,...| 0.06278456396920676|
|(22,[12,21],[1.0,...| 0.03659052652516932|
|(22,[12,21],[1.0,...| 0.03659052652516932|
|(22,[12,21],[1.0,...| 0.0365905

# Ejercicio desarrollado por Carlos Aguilar y Josué Lainez.