In [None]:
import pyspark

sc = pyspark.sql.SparkSession.builder.appName("nycApp").getOrCreate()

In [2]:
textFile = sc.read.csv("hdfs://cluster-9bfd-m/hadoop/data.csv", header=True, inferSchema=True)
type(textFile)

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount']

In [13]:
textFile.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
VendorID,999999,1.61011761011761,0.5280269232650342,1,4
tpep_pickup_datetime,999999,,,01/01/2009 08:03:22 AM,12/31/2008 11:02:25 PM
tpep_dropoff_datetime,999999,,,01/01/2009 08:09:22 AM,12/31/2008 11:11:13 PM
passenger_count,999999,1.5713355713355714,1.2233028996495432,0,9
trip_distance,999999,2.9348390448390447,3.7609906361714622,0.0,145.7
RatecodeID,999999,1.0482510482510483,0.5874925777964289,1,99
store_and_fwd_flag,999999,,,N,Y
PULocationID,999999,163.13445413445413,66.31544034068634,1,265
DOLocationID,999999,161.3028653028653,70.08133215405032,1,265


In [20]:
import six
for i in textFile.columns:
    if not( isinstance(textFile.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to fare_amount for ", i, textFile.stat.corr('fare_amount',i))

('Correlation to fare_amount for ', 'VendorID', 0.019910980878610728)
('Correlation to fare_amount for ', 'passenger_count', 0.010266885823951181)
('Correlation to fare_amount for ', 'trip_distance', 0.8777629936177405)
('Correlation to fare_amount for ', 'RatecodeID', 0.31760236896936894)
('Correlation to fare_amount for ', 'PULocationID', -0.07810575071390112)
('Correlation to fare_amount for ', 'DOLocationID', -0.08412606682886188)
('Correlation to fare_amount for ', 'payment_type', -0.04881366234971991)
('Correlation to fare_amount for ', 'fare_amount', 1.0)
('Correlation to fare_amount for ', 'extra', 0.0797798356206394)
('Correlation to fare_amount for ', 'mta_tax', -0.07833575703293015)
('Correlation to fare_amount for ', 'tip_amount', 0.529505958926563)
('Correlation to fare_amount for ', 'tolls_amount', 0.5186725089550704)
('Correlation to fare_amount for ', 'improvement_surcharge', 0.04126331636064961)
('Correlation to fare_amount for ', 'total_amount', 0.9795476146332738)


In [23]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="fare_amount",metricName="r2")

vectorAssembler = VectorAssembler(inputCols = ['VendorID', 'passenger_count', 'trip_distance', 'RatecodeID'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(textFile)
vhouse_df = vhouse_df.select(['features', 'fare_amount'])
splits = vhouse_df.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

In [24]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
lr_predictions = lr_model.transform(test_df)=
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Coefficients: [0.0,0.0,2.603549051194268,2.625981550486407]
Intercept: 3.08265850956
+-----------------+-----------+-----------------+
|       prediction|fare_amount|         features|
+-----------------+-----------+-----------------+
|5.708640060045765|        2.5|[1.0,0.0,0.0,1.0]|
|5.708640060045765|        2.5|[1.0,0.0,0.0,1.0]|
|5.708640060045765|        2.5|[1.0,0.0,0.0,1.0]|
|5.708640060045765|        2.5|[1.0,0.0,0.0,1.0]|
|5.708640060045765|        2.5|[1.0,0.0,0.0,1.0]|
+-----------------+-----------+-----------------+
only showing top 5 rows

R Squared (R2) on test data = 0.811582
Root Mean Squared Error (RMSE) on test data = 5.0204


In [17]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'fare_amount')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.55615


In [18]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'fare_amount', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(
    labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.62678
