In [1]:
%%time
#init
#set path
import pandas as pd
import numpy as np
import math
from pyspark.sql import SparkSession
from pyspark.sql.functions import sqrt
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.mllib.tree import RandomForest
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator
from pyspark.ml import Pipeline

mypath = "D:/Download/NYtaxi/cleaned16-18/part-00000-1e19c229-6652-4ded-af22-3ca7f6dc4821-c000.csv"
#infile
spark = SparkSession\
        .builder\
        .appName("moz")\
        .getOrCreate()
df = spark.read.csv(mypath,sep = ',',header = True,inferSchema = True).limit(100)

Wall time: 20 s


In [2]:
(df.count(),len(df.columns))

(100, 18)

In [5]:
df.printSchema()

root
 |-- PU_Time: timestamp (nullable = true)
 |-- DO_Time: timestamp (nullable = true)
 |-- Rate_Code: integer (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- Extra: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- Trip_Time: integer (nullable = true)
 |-- PU_Year: integer (nullable = true)
 |-- PU_Month: integer (nullable = true)
 |-- PU_Day: integer (nullable = true)
 |-- PU_Hour: integer (nullable = true)
 |-- PU_Minute: integer (nullable = true)
 |-- PU_Second: integer (nullable = true)
 |-- PU_WeekDay: integer (nullable = true)
 |-- Speed: double (nullable = true)
 |-- DOLocationID2: vector (nullable = true)
 |-- PU_Minute2: vector (nullable = true)
 |-- PU_Month2: vector (nullable = true)
 |-- PU_Hour2: vector (nullable = true)
 |-- PU_Day2: vector (nullable = true)
 |-- PU_Year2: vector (nullable = true)
 |-- 

In [4]:
encoder = OneHotEncoderEstimator(inputCols=['PULocationID', 'DOLocationID', 'PU_Year', 'PU_Month', 'PU_Day', 'PU_Hour', 'PU_Minute', 'PU_Second', 'PU_WeekDay'],
                                 outputCols=['PULocationID2', 'DOLocationID2', 'PU_Year2', 'PU_Month2', 'PU_Day2', 'PU_Hour2', 'PU_Minute2', 'PU_Second2', 'PU_WeekDay2'])
df = encoder.fit(df).transform(df)
len(df.columns)

27

In [6]:
assembler = VectorAssembler(inputCols = ['PULocationID2', 'DOLocationID2', 'PU_Year2', 'PU_Month2', 'PU_Day2', 'PU_Hour2', 'PU_Minute2', 'PU_Second2', 'PU_WeekDay2'], outputCol="features")
training = assembler.transform(encoded).select(col("features"), (col("Trip_Time").cast("Int").alias("label")))
training.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(2734,[140,434,25...|  684|
|(2734,[148,389,25...|  530|
|(2734,[79,492,255...| 1571|
|(2734,[163,434,25...|  583|
|(2734,[146,502,25...| 1339|
|(2734,[79,313,255...|  906|
|(2734,[100,494,25...|  320|
|(2734,[163,371,25...|  994|
|(2734,[262,500,25...|  674|
|(2734,[144,397,25...|  884|
|(2734,[138,314,25...| 2354|
|(2734,[148,344,25...|  607|
|(2734,[113,408,25...|  263|
|(2734,[164,425,25...|  360|
|(2734,[234,426,25...|  897|
|(2734,[170,426,25...|  115|
|(2734,[237,503,25...|  846|
|(2734,[246,332,25...|  424|
|(2734,[43,426,255...|  952|
|(2734,[236,406,25...|  943|
+--------------------+-----+
only showing top 20 rows



In [16]:
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(training)
# print("Coefficients: %s" % str(lrModel.coefficients))
# print("Intercept: %s" % str(lrModel.intercept))
trainingSummary = lrModel.summary
# print("numIterations: %d" % trainingSummary.totalIterations)
# print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
# trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,183.46886907856674,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-4.828915690267745,0.0,0.0,0.0,0.0,309.39764169794546,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,390.5969596554922,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,28.012139078598157,0.0,0.0,0.0,0.0,0.0,0.0,0.0,286.6648500883792,0.0,0.0,132.81179758521816,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,7.646928702606302,0.0,0.0,0.0,0.0,0.0,0.0,-90.46331599324152,0.0,0.0,0.0,0.0,0.0,-105.67750733464668,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-241.09370271249398,0.0,0.0,0.0,0.0,0.0,0.0,36.06163779965615,0.0,0.0,0.0,0.0,-306.9725710273957,790.2881455399829,0.0,-347.28118552618037,-28.692101799313132,-360.16613715143353,0.0,-31.128458841688133,0.0,497.6004649049663,0.0,-52.91067931842041,0.0,0.0,261.88986039009797,0.0,0.0,0.0,0.0,0.0,0.0,-55.04188188450404,0.0,0.0,-45.7591769

+--------------------+
|           residuals|
+--------------------+
|  -1.813936556398403|
|  0.5619944159645911|
|   3.440716497888161|
| -1.5974632979257422|
|   2.579092006247265|
|   0.306383216426525|
|  -3.399298653176402|
|-0.05319838122909459|
| -0.8883683505634963|
|  -1.086313561950874|
|  2.4281026121084324|
|-0.00228793814437...|
| -3.8691985978022103|
| -2.2816592828104376|
|  2.8072258336587765|
| -11.473138541320054|
| -1.7144189305208783|
| -0.5628536107202535|
| -0.5154378828303834|
|   1.556066931993655|
+--------------------+
only showing top 20 rows

RMSE: 3.395200
r2: 0.999976


In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="features")

# Chain indexer and forest in a Pipeline
# pipeline = Pipeline(stages=[rf])

# Train model.  This also runs the indexer.
model = rf.fit(training)

# Make predictions.
# predictions = model.transform(testData)

# Select example rows to display.
# predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
# evaluator = RegressionEvaluator(
#     labelCol="label", predictionCol="prediction", metricName="rmse")
# rmse = evaluator.evaluate(predictions)
# print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# rfModel = model.stages[1]
print(model.featureImportances)  # summary only

(2734,[7,48,68,79,87,90,132,138,142,163,164,166,170,186,211,292,314,352,371,393,396,402,492,494,2551,2553,2555,2557,2558,2564,2571,2574,2576,2577,2581,2582,2583,2587,2598,2600,2603,2605,2606,2612,2617,2622,2629,2632,2636,2637,2647,2651,2663,2665,2672,2674,2678,2689,2695,2721,2728,2730,2731,2733],[2.923616412616114e-05,0.046699484478294175,0.004373883182580099,3.7356801800944215e-05,0.004756045027926298,1.4221343056988172e-06,0.002004235965457388,0.12710410808592051,0.00022445222215077462,0.013287131526763012,0.0014442847997919981,0.017815994513860255,0.0018222987419236013,0.011845101726825598,0.0015300391181121992,0.03522048623820899,0.002823135177137212,0.0032886313038396862,0.0003239180953561565,0.023926191453469147,0.10424962479733643,0.0009690676063135609,0.0029865138103683424,0.025622915740250905,0.004949348156480034,0.012405885552212658,0.0011655419470293902,0.006618240435893881,0.005427250557762449,0.02433859141313729,8.784008679208734e-05,0.0036461449853859656,0.013018905850407