In [608]:
import pyspark as ps

spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("sparkSQL exercise") 
        .getOrCreate()
        )
sc = spark.sparkContext

In [609]:
data = spark.read.json('data/restaurants.json.gz')

In [610]:
data.show(5, truncate = False)

+--------------------+--------+--------+----------+-----------+------------+-----------+------------+-----+-----+-------+------------------+
|accepts_credit_cards|bar_type|city    |latitude  |longitude  |neighborhood|price_range|review_count|stars|state|takeout|takes_reservations|
+--------------------+--------+--------+----------+-----------+------------+-----------+------------+-----+-----+-------+------------------+
|true                |full_bar|Braddock|40.408735 |-79.8663507|null        |1          |11          |4.5  |PA   |true   |false             |
|true                |full_bar|Carnegie|40.415517 |-80.067534 |Greentree   |1          |15          |4.0  |PA   |true   |false             |
|true                |none    |Carnegie|40.3877323|-80.0928745|null        |1          |8           |3.5  |PA   |true   |false             |
|true                |full_bar|Carnegie|40.3964688|-80.0849416|null        |2          |5           |4.0  |PA   |false  |false             |
|false       

In [611]:
data.printSchema()

root
 |-- accepts_credit_cards: boolean (nullable = true)
 |-- bar_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- price_range: long (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- takeout: boolean (nullable = true)
 |-- takes_reservations: boolean (nullable = true)



In [612]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

In [613]:
data = data.withColumn('accepts_credit_cards', data.accepts_credit_cards.astype(DoubleType()))
data = data.withColumn('price_range', data.price_range.astype(DoubleType()))
data = data.withColumn('review_count', data.review_count.astype(DoubleType()))
data = data.withColumn('takeout', data.takes_reservations.astype(DoubleType()))
data = data.withColumn('takes_reservations', data.takes_reservations.astype(DoubleType()))

#data = data.dropna(how='any')
#data = data.fillna({'bar_type':'unknown','city':'unknown','neighborhood':'unknown','state':'unknown'})

In [614]:
train, test  = data.randomSplit([0.7,0.3])

In [615]:
train.show(5)

+--------------------+-------------+-----------+----------+----------+------------+-----------+------------+-----+-----+-------+------------------+
|accepts_credit_cards|     bar_type|       city|  latitude| longitude|neighborhood|price_range|review_count|stars|state|takeout|takes_reservations|
+--------------------+-------------+-----------+----------+----------+------------+-----------+------------+-----+-----+-------+------------------+
|                 0.0|beer_and_wine|Durmersheim|   48.9374|   8.27346|        null|        1.0|         3.0|  3.0|   BW|    0.0|               0.0|
|                 0.0|beer_and_wine|  Edinburgh|55.9387706|-3.1787491|   Newington|        3.0|        12.0|  3.5|  EDH|    1.0|               1.0|
|                 0.0|beer_and_wine|  Edinburgh|55.9426152| -3.182485|   Newington|        2.0|        26.0|  4.0|  EDH|    1.0|               1.0|
|                 0.0|beer_and_wine|  Edinburgh|55.9444983|-3.1856957|   Newington|        1.0|        41.0|  4.

In [616]:
train.printSchema()

root
 |-- accepts_credit_cards: double (nullable = true)
 |-- bar_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- price_range: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- takeout: double (nullable = true)
 |-- takes_reservations: double (nullable = true)



In [617]:
train.show(5, truncate= False)

+--------------------+-------------+-----------+----------+----------+------------+-----------+------------+-----+-----+-------+------------------+
|accepts_credit_cards|bar_type     |city       |latitude  |longitude |neighborhood|price_range|review_count|stars|state|takeout|takes_reservations|
+--------------------+-------------+-----------+----------+----------+------------+-----------+------------+-----+-----+-------+------------------+
|0.0                 |beer_and_wine|Durmersheim|48.9374   |8.27346   |null        |1.0        |3.0         |3.0  |BW   |0.0    |0.0               |
|0.0                 |beer_and_wine|Edinburgh  |55.9387706|-3.1787491|Newington   |3.0        |12.0        |3.5  |EDH  |1.0    |1.0               |
|0.0                 |beer_and_wine|Edinburgh  |55.9426152|-3.182485 |Newington   |2.0        |26.0        |4.0  |EDH  |1.0    |1.0               |
|0.0                 |beer_and_wine|Edinburgh  |55.9444983|-3.1856957|Newington   |1.0        |41.0        |4.0 

In [618]:
double_cols = ['accepts_credit_card','price_range', 'review_count','takeout','takes_reservations']
str_cols = ['bar_type','city', 'neighborhood','state']
str_cols_idx = list([cols+'_indexed' for cols in str_cols])
str_cols_oh  = list([cols+'_one_hot' for cols in str_cols])

In [619]:
indexer_1 = StringIndexer(inputCol = 'bar_type', outputCol = 'bar_type_index', handleInvalid = 'keep')
indexer_2 = StringIndexer(inputCol = 'city', outputCol = 'city_index', handleInvalid = 'keep')
indexer_3 = StringIndexer(inputCol = 'neighborhood', outputCol = 'neighborhood_index', handleInvalid = 'keep')
indexer_4 = StringIndexer(inputCol = 'state', outputCol = 'state_index', handleInvalid = 'keep')

one_hot_1 = OneHotEncoder(inputCol = 'bar_type_index', outputCol = 'bar_type_one_hot')
one_hot_2 = OneHotEncoder(inputCol = 'city_index', outputCol = 'city_one_hot')
one_hot_3 = OneHotEncoder(inputCol = 'neighborhood_index', outputCol = 'neighborhood_one_hot')
one_hot_4 = OneHotEncoder(inputCol = 'state_index', outputCol = 'state_one_hot')
             
assembler = VectorAssembler(inputCols = ['accepts_credit_cards','bar_type_one_hot','city_one_hot','latitude','longitude','neighborhood_one_hot','price_range','review_count','state_one_hot','takeout','takes_reservations'], outputCol = 'features_1')
normalize = StandardScaler(inputCol = 'features_1', outputCol = 'features')
lr = LinearRegression(featuresCol = 'features',labelCol = 'stars',maxIter=10, regParam=0.3)
dtr = DecisionTreeRegressor(featuresCol = 'features',labelCol = 'stars')

pipeline = Pipeline(stages=[indexer_1,indexer_2,indexer_3,indexer_4, one_hot_1,one_hot_2,one_hot_3,one_hot_4, assembler,normalize,lr])
pipeline_2 = Pipeline(stages=[indexer_1,indexer_2,indexer_3,indexer_4, one_hot_1,one_hot_2,one_hot_3,one_hot_4, assembler,normalize,dtr])
model = pipeline.fit(train)
model_2 = pipeline_2.fit(train)


In [620]:
preds = model.transform(test)
preds_2 = model_2.transform(test)

In [621]:
preds.select('prediction', 'stars').show(5)

+------------------+-----+
|        prediction|stars|
+------------------+-----+
| 4.207441030766111|  4.5|
| 3.827138738945079|  3.5|
|   3.8276021784282|  5.0|
| 3.829054397342927|  3.5|
|3.8300558260974196|  4.0|
+------------------+-----+
only showing top 5 rows



In [622]:
preds_2.select('prediction','stars').show(5)

+-----------------+-----+
|       prediction|stars|
+-----------------+-----+
|3.807377049180328|  4.5|
|3.807377049180328|  3.5|
|3.807377049180328|  5.0|
|3.807377049180328|  3.5|
|3.807377049180328|  4.0|
+-----------------+-----+
only showing top 5 rows



In [623]:
from pyspark.ml.evaluation import RegressionEvaluator

In [624]:
lr_evaluator = RegressionEvaluator(predictionCol ='prediction', labelCol ='stars', metricName='mse')
dtr_evaluator = RegressionEvaluator(predictionCol ='prediction', labelCol ='stars', metricName='mse')

In [625]:
lr_evaluator.evaluate(preds)

0.3942647916284995

In [626]:
dtr_evaluator.evaluate(preds_2)

0.3756460138112749