<a href="https://colab.research.google.com/github/ekrako/big_data_final/blob/main/Final_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import and Install Packages

In [1]:
import sys
import subprocess
import pkg_resources
required = {'utm','kaggle','findspark'}
installed = {pkg.key for pkg in pkg_resources.working_set}
missing = required - installed
 
if missing:
    python = sys.executable
    subprocess.check_call([python, '-m', 'pip', 'install', *missing], stdout=subprocess.DEVNULL)
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import requests
import json
import geopy.distance
import utm
import os
from sklearn.metrics import r2_score
from sklearn.metrics import confusion_matrix, accuracy_score
import findspark


# Get the data

In [2]:
kaggle_json_location='/content/drive/My Drive/Colab Notebooks/Datasets/idc/Big Data/'
 
if 'google.colab' in str(get_ipython()):
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  if not os.path.exists('./spark-3.1.2-bin-hadoop3.2.tgz'):
    !wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
    !tar xf spark-3.1.2-bin-hadoop3.2.tgz
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
  findspark.init()
  from pyspark.sql import SparkSession
  from google.colab import drive
    # drive.mount('/content/drive')
 
  if not os.path.exists('~/.kaggle/'):
    !mkdir ~/.kaggle/
  if not os.path.exists('~/.kaggle/kaggle.json'):
    # os.popen(f"cp '{kaggle_json_location}kaggle.json' ~/.kaggle/")
    !cp '/content/drive/My Drive/Colab Notebooks/Datasets/idc/Big Data/kaggle.json' ~/.kaggle/
    !chmod 600 ~/.kaggle/kaggle.json 
if not os.path.exists('./idc_train.csv'):
  !kaggle competitions download -c 'idc-big-data-platforms-2021-ml-competition' 
  !unzip idc_train.csv.zip
 
spark = SparkSession.builder.master("local[*]").getOrCreate()


Downloading idc_train.csv.zip to /content
100% 460M/461M [00:03<00:00, 154MB/s]
100% 461M/461M [00:03<00:00, 126MB/s]
Downloading idc_test.csv to /content
  0% 0.00/722k [00:00<?, ?B/s]
100% 722k/722k [00:00<00:00, 99.9MB/s]
Archive:  idc_train.csv.zip
  inflating: idc_train.csv           


# load train data

In [None]:

df = spark.read.format("csv") \
      .option("header", True) \
      .option("inferSchema", True) \
      .load("idc_train.csv")
df.cache() 
df.printSchema()
df.count()

root
 |-- key: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



19990000

#load test data

In [None]:
test_df = spark.read.format("csv") \
      .option("header", True) \
      .option("inferSchema", True) \
      .load("idc_test.csv")
 
test_df.printSchema()
test_df.count()

root
 |-- key: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



10000

# convert pickup to datetime

In [None]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_timestamp

df = df.withColumn('pickup_datetime',to_timestamp(df['pickup_datetime'], 'yyyy-MM-dd HH:mm:ss z'))
df.printSchema()

root
 |-- key: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



# remove rows with wrong data

In [None]:

df=df.na.drop()
df=df.filter(~((df.pickup_longitude<-75)|(df.pickup_latitude>42)|(df.dropoff_longitude<-75)|(df.dropoff_latitude>42)))
df=df.filter(~((df.pickup_longitude>-73)|(df.pickup_latitude<37)|(df.dropoff_longitude>-73)|(df.dropoff_latitude<37)))
df=df.filter(((df.fare_amount<100)&(df.fare_amount>0)))
df=df.filter(df.passenger_count<100)

#convert Geographic coordiantes to UTM



In [None]:
import pyspark.sql.functions as F
import utm
from pyspark.sql.types import *

utm_udf_x = F.udf(lambda x,y: utm.from_latlon(x,y)[0].item() , FloatType())
utm_udf_y = F.udf(lambda x,y: utm.from_latlon(x,y)[1].item() , FloatType())

df = df.withColumn('pickup_lon_utm',utm_udf_x(F.col('pickup_latitude'), F.col('pickup_longitude')))
df = df.withColumn('pickup_lat_utm',utm_udf_y(F.col('pickup_latitude'), F.col('pickup_longitude')))
df = df.withColumn('dropoff_lon_utm',utm_udf_x(F.col('dropoff_latitude'), F.col('dropoff_longitude')))
df = df.withColumn('dropoff_lat_utm',utm_udf_y(F.col('dropoff_latitude'), F.col('dropoff_longitude')))

# Calulate eculidian and manhatten distances

In [None]:
from pyspark.sql.functions import abs,sqrt

df = df.withColumn('manhattan_distance',abs(df['dropoff_lat_utm']-df['pickup_lat_utm'])+abs(df['dropoff_lon_utm']-df['pickup_lon_utm']))
df = df.withColumn('uclidean_distance',sqrt((df['dropoff_lat_utm']-df['pickup_lat_utm'])**2+(df['dropoff_lon_utm']-df['pickup_lon_utm'])**2))

# samp['utm_dist']=(np.abs(samp['pickup_utm_lat']-samp['dropoff_utm_lat'])+np.abs(samp['pickup_utm_lon']-samp['dropoff_utm_lon']))/1000

In [None]:
df.printSchema()

root
 |-- key: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_lon_utm: float (nullable = true)
 |-- pickup_lat_utm: float (nullable = true)
 |-- dropoff_lon_utm: float (nullable = true)
 |-- dropoff_lat_utm: float (nullable = true)
 |-- manhattan_distance: float (nullable = true)
 |-- uclidean_distance: double (nullable = true)



# extract time features from date time

In [None]:
from pyspark.sql.functions import dayofmonth,hour,dayofyear,month,year,weekofyear,format_number,date_format
df = df.withColumn('dayofmonth',dayofmonth(df['pickup_datetime']))
df = df.withColumn('month',month(df['pickup_datetime']))
df = df.withColumn('year',year(df['pickup_datetime']))
df = df.withColumn('weekofyear',weekofyear(df['pickup_datetime']))
df = df.withColumn('weekofyear',weekofyear(df['pickup_datetime']))
df = df.withColumn('hour',hour(df['pickup_datetime']))

#convert fare to binary value for classifier

In [None]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer(threshold=10, inputCol="fare_amount", outputCol="label")
if 'label' in df.columns:
  df=df.drop('label')
df=binarizer.transform(df)
df.printSchema()

root
 |-- key: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_lon_utm: float (nullable = true)
 |-- pickup_lat_utm: float (nullable = true)
 |-- dropoff_lon_utm: float (nullable = true)
 |-- dropoff_lat_utm: float (nullable = true)
 |-- manhattan_distance: float (nullable = true)
 |-- uclidean_distance: double (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- label: double (nullable = true)



#create Features column

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=[ 'passenger_count', 'pickup_lon_utm', 'pickup_lat_utm', 'dropoff_lon_utm', 'dropoff_lat_utm', 'manhattan_distance', 'uclidean_distance', 'dayofmonth',
 'month', 'year', 'weekofyear', 'hour'],outputCol='features')

In [None]:
output=assembler.transform(df)

In [None]:
output.write.parquet('/content/drive/My Drive/Colab Notebooks/Datasets/idc/Big Data/idc_train_prepared.parquet')

In [3]:
output = spark.read.format("parquet").load('/content/drive/My Drive/Colab Notebooks/Datasets/idc/Big Data/idc_train_prepared.parquet')

# split to train and test

In [4]:
train_data,test_data=output.randomSplit([0.8,0.2],seed=42)

# time and evalution methods

In [5]:
import datetime as dt
def timing(f):
  def wrapper(dataf,*args,**kwargs):
    tic=dt.datetime.now()
    result=f(dataf,*args,**kwargs)
    toc=dt.datetime.now()
    print(f"{f.__name__} took {toc-tic}")
    return result
  return wrapper

In [6]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
@timing
def fit_and_evaluate(model,train_data,test_data,caption=""):
  fit_model =model.fit(train_data)
  test_results= fit_model.transform(test_data)
  preds_and_labels = test_results.select(['prediction','label'])
  evaluator = BinaryClassificationEvaluator()
  print(caption,'AUC:', evaluator.evaluate(test_results))     
  preds_and_labels = test_results.select(['prediction','label'])
  metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
  cm =pd.DataFrame(metrics.confusionMatrix().toArray(),dtype='int64')
  cm.index.rename('label',inplace=True)
  cm.columns.rename('prediction',inplace=True)
  print(cm)

#Logistic Regression train and evaluate

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
fit_and_evaluate(lr,train_data,test_data,caption="Logistic Regression")

Logistic Regression AUC: 0.929599423410627
prediction        0        1
label                       
0           2308877   136864
1            349104  1118562
fit_and_evaluate took 6:25:15.645817


#Decision Tree train and evaluate

In [8]:
from pyspark.ml.classification import DecisionTreeClassifier
dtree=DecisionTreeClassifier(maxDepth=20)
fit_and_evaluate(dtree,train_data,test_data,caption="Decision Tree")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Traceback (most recent call last):
  File "/content/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/content/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/content/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35119)
Traceback (most recent call last):
  F

Py4JNetworkError: ignored

#Random Forest train and evaluate

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf=RandomForestClassifier()
fit_and_evaluate(rf,train_data,test_data,caption="Random Forest")

# Gradient-boosted tree train and evaluate



In [None]:
'from pyspark.ml.classification import GBTClassifier
gbt=GBTClassifier()
fit_and_evaluate(gbt,train_data,test_data,caption="Gradient-boosted tree")'

In [None]:
def get_route(pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude):

  r = requests.get(f"http://router.project-osrm.org/route/v1/car/{pickup_longitude},{pickup_latitude};{dropoff_longitude},{dropoff_latitude}?overview=false""")
  # then you load the response using the json libray
  # by default you get only one alternative so you access 0-th element of the `routes`
  routes = json.loads(r.content)
  distance,duration=0,0
  if routes.get("routes")!=None:
    distance=routes.get("routes")[0]["distance"]
    duration=routes.get("routes")[0]["duration"]
  print(distance,duration)
  return distance,duration

In [None]:
import pyspark.sql.functions as F
import utm
from pyspark.sql.types import *

utm_udf_distance = F.udf(lambda pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude: get_route(pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude)[0] , FloatType())
utm_udf_duration = F.udf(lambda pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude: get_route(pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude)[1] , FloatType())

df2 = df2.withColumn('duration',utm_udf_duration( F.col('pickup_longitude'),F.col('pickup_latitude'),F.col('dropoff_longitude'),F.col('dropoff_latitude')))
df2 = df2.withColumn('distance',utm_udf_distance( F.col('pickup_longitude'),F.col('pickup_latitude'),F.col('dropoff_longitude'),F.col('dropoff_latitude')))


In [None]:
df2=df2.drop('duration')
df2=df2.drop('distance')

In [None]:
df2.tail(10)

[Row(key=190, fare_amount=6.9, pickup_datetime=datetime.datetime(2009, 2, 11, 15, 59), pickup_longitude=-73.994194, pickup_latitude=40.75114, dropoff_longitude=-73.97478, dropoff_latitude=40.75542, passenger_count=5, pickup_lon_utm=584907.8125, pickup_lat_utm=4511618.5, dropoff_lon_utm=586541.1875, dropoff_lat_utm=4512112.5, manhattan_distance=2127.375, uclidean_distance=1706.4436382796239, duration=227.8000030517578, distance=None),
 Row(key=191, fare_amount=19.7, pickup_datetime=datetime.datetime(2012, 1, 4, 18, 30), pickup_longitude=-74.0104, pickup_latitude=40.71137, dropoff_longitude=-73.943275, dropoff_latitude=40.77732, passenger_count=1, pickup_lon_utm=583589.5, pickup_lat_utm=4507188.0, dropoff_lon_utm=589171.375, dropoff_lat_utm=4514575.0, manhattan_distance=12968.875, uclidean_distance=9258.784883321623, duration=842.7000122070312, distance=12838.2998046875),
 Row(key=192, fare_amount=9.3, pickup_datetime=datetime.datetime(2009, 2, 14, 19, 53), pickup_longitude=-73.981026, p

In [None]:
y_train=(y_train>10).astype(int)
from xgboost import XGBClassifier
classifier = XGBClassifier()
classifier.fit(X_train, y_train)

XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
              colsample_bynode=1, colsample_bytree=1, gamma=0,
              learning_rate=0.1, max_delta_step=0, max_depth=3,
              min_child_weight=1, missing=None, n_estimators=100, n_jobs=1,
              nthread=None, objective='binary:logistic', random_state=0,
              reg_alpha=0, reg_lambda=1, scale_pos_weight=1, seed=None,
              silent=None, subsample=1, verbosity=1)

In [None]:
y_train=(y_train>10).astype(int)
from sklearn.svm import SVC
classifier = SVC(kernel = 'rbf', random_state = 0)
classifier.fit(X_train, y_train)

SVC(C=1.0, break_ties=False, cache_size=200, class_weight=None, coef0=0.0,
    decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
    max_iter=-1, probability=False, random_state=0, shrinking=True, tol=0.001,
    verbose=False)

In [None]:
from sklearn.model_selection import GridSearchCV
parameters = [{'C':[0.25,0.5,0.75,1.0],'kernel':['liner']},
              {'C':[0.25,0.5,0.75,1.0],'kernel':['rbf'],'gamma':[0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]}]#list(np.arange(0.1,0.9,0.1))}]
grid_search = GridSearchCV(estimator = classifier, param_grid=parameters, scoring='accuracy',cv=10,n_jobs=-1)
grid_search.fit(X_train,y_train)
best_accuracy = grid_search.best_score_
best_paramters = grid_search.best_params_
print("Accuracy: {:.2f} %".format(best_accuracy*100))
print(f"Best Paramters: {best_paramters}")



Accuracy: 88.60 %
Best Paramters: {'C': 1.0, 'gamma': 0.9, 'kernel': 'rbf'}


In [None]:

from sklearn.linear_model import LinearRegression
regressor =  LinearRegression()
regressor.fit(X_train,y_train)
# y_train=sc_y.inverse_transform(y_train)

LinearRegression(copy_X=True, fit_intercept=True, n_jobs=None, normalize=False)

In [None]:
from sklearn.tree import DecisionTreeRegressor
regressor = DecisionTreeRegressor(random_state=42,max_depth=8)
regressor.fit(X_train,y_train)

DecisionTreeRegressor(ccp_alpha=0.0, criterion='mse', max_depth=8,
                      max_features=None, max_leaf_nodes=None,
                      min_impurity_decrease=0.0, min_impurity_split=None,
                      min_samples_leaf=1, min_samples_split=2,
                      min_weight_fraction_leaf=0.0, presort='deprecated',
                      random_state=42, splitter='best')

In [None]:
y_pred1 = regressor.predict(X_train)
r2_score(y_train, y_pred1)

0.842818535091255

In [None]:
y_class=y_train>10
y_pred_class=y_pred>10
cm = confusion_matrix(y_class, y_pred_class)
print(cm)
accuracy_score(y_class, y_pred_class)

[[58258  4146]
 [ 7819 29777]]


0.88035

In [None]:
y_class=y_train
y_pred_class=classifier.predict(X_train)
cm = confusion_matrix(y_class, y_pred_class)
print(cm)
accuracy_score(y_class, y_pred_class)

[[59257  3147]
 [ 8586 29010]]


0.88267

In [None]:
y_class=(y_test>10).astype(int)
y_pred_class=classifier.predict(X_test)
cm = confusion_matrix(y_class, y_pred_class)
print(cm)
accuracy_score(y_class, y_pred_class)

[[59463  3163]
 [ 8811 28563]]


0.88026

In [None]:
y_pred = regressor.predict(X_test)
r2_score(y_test, y_pred)

0.8068522508646816

In [None]:
y_class=y_test>10
y_pred_class=y_pred>10
cm = confusion_matrix(y_class, y_pred_class)
print(cm)
accuracy_score(y_class, y_pred_class)

[[56847  5779]
 [ 7190 30184]]


0.87031

In [None]:
!ls -la


total 511682
-rw------- 1 root root    738918 Jul 15 07:04 idc_test.csv
-rw------- 1 root root 503059860 Jul 15 07:22 idc_train.parquet
-rw------- 1 root root        62 Jul 15 05:59 kaggle.json
-rw------- 1 root root  20161578 Jul 15 16:10 samp.parquet


In [None]:
df=pd.read_csv('idc_test.csv')
df['pickup_utm_lat'],df['pickup_utm_lon'],df['dropoff_utm_lat'],df['dropoff_utm_lon']=zip(*df.apply(lambda row: get_utm(row), axis=1))
df['distance']=df.apply(lambda row: get_distance(row),axis=1)
df['utm_dist']=(np.abs(df['pickup_utm_lat']-df['dropoff_utm_lat'])+np.abs(df['pickup_utm_lon']-df['dropoff_utm_lon']))/1000
df['pickup_datetime']=df['pickup_datetime'].apply(pd.to_datetime)
X=df[['utm_dist','passenger_count', 'distance', 'pickup_utm_lat', 'pickup_utm_lon','dropoff_utm_lat', 'dropoff_utm_lon']].values
# X = poly_reg.fit_transform(X)
X= sc_X.transform(X)
df['fare_amount'] = regressor.predict(X)
df['high_fare']=(df['fare_amount']>10).astype(int)
df[['key','high_fare']].to_csv('submission.csv',index=False)


In [None]:
!kaggle competitions submit -c idc-big-data-platforms-2021-ml-competition -f submission.csv -m "DecisionTreeRegressor"

100% 107k/107k [00:00<00:00, 301kB/s]
Successfully submitted to IDC Big Data Platforms 2021 ML Competition

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
lr_model =lr.fit(train_data)
test_results= lr_model.evaluate(test_data)
preds_and_labels = test_results.predictions.select(['prediction','label'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(f"Logistic Regression AUC: {test_results.areaUnderROC}")
cm =pd.DataFrame(metrics.confusionMatrix().toArray())
cm.index.rename('label',inplace=True)
cm.columns.rename('prediction',inplace=True)
print(cm)