In [1]:
%%sh
echo '{"kernel_python_credentials" : {"url": "http://172.31.35.237:8998/"}, "session_configs": 
{"executorMemory": "2g","executorCores": 2,"numExecutors":4}}' > ~/.sparkmagic/config.json
less ~/.sparkmagic/config.json

{"kernel_python_credentials" : {"url": "http://172.31.35.237:8998/"}, "session_configs": 
{"executorMemory": "2g","executorCores": 2,"numExecutors":4}}


In [None]:
## CHANGE ME ##
file_path = "s3://neilawspublic/ml/data/data.csv"
spark_model_location='s3://<bucket>/models/car_price_prediction_model/'

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor

data = spark.read.csv(path=file_path, header=True, quote='"', sep=",", inferSchema=True)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
22,,pyspark,idle,,,✔


SparkSession available as 'spark'.


In [3]:
data.printSchema()

root
 |-- Price: double (nullable = true)
 |-- Mileage: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Trim: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Cylinder: integer (nullable = true)
 |-- Liter: double (nullable = true)
 |-- Doors: integer (nullable = true)
 |-- Cruise: integer (nullable = true)
 |-- Sound: integer (nullable = true)
 |-- Leather: integer (nullable = true)

In [4]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.ml.feature import StringIndexer, VectorAssembler

def get_indexer_input(data):
    str_cols_value = {}
    for c, t in data[data.columns].dtypes:
        if t == 'string':
            str_cols_value[c] = StringIndexer(inputCol=c, outputCol='indexed_' + c).fit(data)
    return str_cols_value


In [5]:
data_test, data_train = data.randomSplit(weights=[0.3, 0.7], seed=10)

get_indexer_input = get_indexer_input(data)
print (get_indexer_input)

{'Trim': StringIndexer_40caa3ec7e2c4b54610e, 'Make': StringIndexer_46ce96f656ccd971cbb6, 'Type': StringIndexer_4818acc21a4351897db0, 'Model': StringIndexer_48b098f594ec78f0da60}

In [6]:
def model_training(data_train, indexer_input):
    x_cols = list(set(data_train.columns) - set(indexer_input.keys() + ["Price"]))
    str_ind_cols = ['indexed_' + column for column in indexer_input.keys()]
    indexers = indexer_input.values()
    pipeline_tr = Pipeline(stages=indexers)
    data_tr = pipeline_tr.fit(data_train).transform(data_train)
    assembler = VectorAssembler(inputCols=x_cols, outputCol="features")
    gbt = GBTRegressor(featuresCol="features", labelCol="Price", stepSize=0.008, maxDepth=5, subsamplingRate=0.75,
                       seed=10, maxIter=20, minInstancesPerNode=5, checkpointInterval=100, maxBins=64)
    pipeline_training = Pipeline(stages=[assembler, gbt])
    model = pipeline_training.fit(data_tr)
    return model


def model_testing(model, data_test, indexer_input):
    indexers = indexer_input.values()
    pipeline_te = Pipeline(stages=indexers)
    data_te = pipeline_te.fit(data_test).transform(data_test)
    predictions = model.transform(data_te)
    predictions.select("Price","prediction").show(10,False)

In [9]:
model = model_training(data_train, get_indexer_input)
model.save(spark_model_location)

In [10]:
#Test the model
from pyspark.ml import PipelineModel
from pyspark.ml import Pipeline
import json
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler

def get_indexer_input(data):
    str_cols_value = {}
    for c, t in data[data.columns].dtypes:
        if t == 'string':
            str_cols_value[c] = StringIndexer(inputCol=c, outputCol='indexed_' + c).fit(data)
    return str_cols_value

def model_testing(model, data_test, indexer_input):
    indexers = indexer_input.values()
    pipeline_te = Pipeline(stages=indexers)
    data_te = pipeline_te.fit(data_test).transform(data_test)
    data_te.show(1,False)
    predictions = model.transform(data_test)
    predictions.select("prediction").show(10,False)

sameModel = PipelineModel.load(path=spark_model_location)

j={"Price":9041.9062544231,"Mileage":26191,"Make":"Chevrolet","Model":"AVEO","Trim":"SVM Sedan 4D","Type":"Sedan","Cylinder":4,"Liter"
:1.6,"Doors":4,"Cruise":0,"Sound":0,"Leather":1}

a=[json.dumps(j)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)

get_indexer_input = get_indexer_input(df)
model_testing(sameModel, df, get_indexer_input)

+------+--------+-----+-------+-----+---------+-------+-----+---------------+-----+------------+-----+------------+------------+------------+-------------+
|Cruise|Cylinder|Doors|Leather|Liter|Make     |Mileage|Model|Price          |Sound|Trim        |Type |indexed_Trim|indexed_Make|indexed_Type|indexed_Model|
+------+--------+-----+-------+-----+---------+-------+-----+---------------+-----+------------+-----+------------+------------+------------+-------------+
|0     |4       |4    |1      |1.6  |Chevrolet|26191  |AVEO |9041.9062544231|0    |SVM Sedan 4D|Sedan|0.0         |0.0         |0.0         |0.0          |
+------+--------+-----+-------+-----+---------+-------+-----+---------------+-----+------------+-----+------------+------------+------------+-------------+

+------------------+
|prediction        |
+------------------+
|10236.175823272792|
+------------------+