In [1]:
data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("FileStore/tables/taxi_train.csv")

In [2]:
data.cache()
data = data.dropna()
data.createOrReplaceTempView("data_taxi")

In [3]:
data.show()

In [4]:
data.count()

In [5]:
data.dtypes

In [6]:
import pandas as pd
import numpy as np

In [7]:
import pyspark.sql.functions as F
split_col = F.split(data['pickup_datetime'], ' ')
data = data.withColumn('pickup_date', split_col.getItem(0))
data = data.withColumn('pickup_time', split_col.getItem(1))
split_col_1 = F.split(data['dropoff_datetime'], ' ')
data = data.withColumn('dropoff_date', split_col_1.getItem(0))
data = data.withColumn('dropoff_time', split_col_1.getItem(1))

In [8]:
display(data)

In [9]:
from math import sqrt
from scipy.spatial.distance import euclidean
from math import sqrt

In [10]:
def haversine(lon1, lat1, lon2, lat2):
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r

In [11]:
def manhattan_distance_pd(lat1, lng1, lat2, lng2):
    a = haversine_(lat1, lng1, lat1, lng2)
    b = haversine_(lat1, lng1, lat2, lng1)
    return a + b

In [12]:
def bearing_array(lat1, lng1, lat2, lng2):
   
    AVG_EARTH_RADIUS = 6371  # in km
    lng_delta_rad = np.radians(lng2 - lng1)
    lat1, lng1, lat2, lng2 = map(np.radians, (lat1, lng1, lat2, lng2))
    y = np.sin(lng_delta_rad) * np.cos(lat2)
    x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(lng_delta_rad)
    return np.degrees(np.arctan2(y, x))

In [13]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from math import radians, cos, sin, asin, sqrt

In [14]:
udf_haversine = udf(haversine, DoubleType())
data = data.withColumn("Haversine_distance", udf_haversine(data['pickup_latitude'],data['pickup_longitude'],data['dropoff_latitude'],data['dropoff_longitude']))

In [15]:
#udf_manhattan = udf(manhattan_distance_pd, DoubleType())
#data = data.withColumn("Manhattan_distance", udf_manhattan(data['pickup_latitude'],data['pickup_longitude'],data['dropoff_latitude'],data['dropoff_longitude']))

In [16]:
#udf_bearing = udf(bearing_array, DoubleType())
#data = data.withColumn("Bearing_distance", udf_bearing(data['pickup_latitude'],data['pickup_longitude'],data['dropoff_latitude'],data['dropoff_longitude']))

In [17]:
display(data)

In [18]:
data = data.withColumn("Average_speed",1000 * data['Haversine_distance'] / data['trip_duration'])

In [19]:
display(data)

In [20]:
data = data.drop('dropoff_datetime')
data = data.drop('pickup_datetime')

In [21]:
display(data)

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd
import numpy as np
import functools
from pyspark.ml.feature import OneHotEncoder
from pyspark import SQLContext
from pyspark import SparkContext

In [23]:
sc

In [24]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("TermDeposit").getOrCreate()

In [25]:
cols_select= ['id',
              'vendor_id','passenger_count', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'store_and_fwd_flag',
              'trip_duration', 'pickup_date', 'pickup_time', 'dropoff_date', 'dropoff_time', 'Haversine_distance', 'Average_speed']

df= data.select(cols_select).dropDuplicates()

In [26]:
df.dtypes

In [27]:
 tmp = []
cols_now = ['vendor_id','passenger_count', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
            'Haversine_distance', 'Average_speed']
assembler_features = VectorAssembler(inputCols= cols_now, outputCol='features')
label_Indexer = StringIndexer(inputCol='trip_duration', outputCol='label')
accuracy_ = 0.583695
#tmp+=[assembler_features, label_Indexer]
#pipeline = Pipeline(stages=tmp)

In [28]:
allData = pipeline.fit(df).transform(df)
allData.cache()
trainingData, testData= allData.randomSplit([0.7,0.3],seed=0)

In [29]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [30]:
lrModel = lr.fit(trainingData)

In [31]:
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [32]:
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(trainingData)

In [33]:
# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [35]:
labelIndexer = StringIndexer(inputCol="trip_duration", outputCol="indexedLabel").fit(df)

In [36]:
assembler = VectorAssembler(inputCols=cols_now, outputCol="features")

In [37]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")

In [38]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [39]:
predictions = model.transform(testData)
predictions = predictions.select("prediction", "label")
predictions.show(10)

In [40]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
#Applying the evaluator and calculating accuracy
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy_))

In [41]:
labelIndexer = StringIndexer(inputCol="trip_duration", outputCol="indexedLabel").fit(df)

In [42]:
data.dtypes

In [43]:
from pyspark.sql.functions import UserDefinedFunction

binary_map = {'Y':1.0, 'N':0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

In [44]:
data = data.drop('id').drop('pickup_longitude')\
      .drop('pickup_latitude').drop('dropoff_longitude')\
      .drop('dropoff_latitude').drop('pickup_date')\
      .drop('pickup_time').drop('dropoff_date').drop('dropoff_time')\
      .withColumn('store_and_fwd_flag', toNum(data['store_and_fwd_flag'])).cache()

In [45]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

transformed_df = data.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))

In [46]:
cols_then = ['vendor_id','passenger_count',
            'Haversine_distance', 'Average_speed']

In [47]:
assembler = VectorAssembler(
    inputCols=cols_then,
    outputCol='features')

In [48]:
labelIndexer = StringIndexer(inputCol="trip_duration", outputCol="indexedLabel").fit(data)

In [49]:
assembler.transform(data)

In [50]:
(trainingData, testData) = data.randomSplit([0.8, 0.2])

In [51]:
from pyspark.ml.regression import RandomForestRegressor
rf1 = RandomForestRegressor(featuresCol="features", labelCol="label")

In [52]:
pipeline1 = Pipeline(stages=[assembler, rf1])

In [53]:
model1 = pipeline1.fit(trainingData)