In [1]:
import pandas as pd
import holidays
import seaborn as sns
import requests
import numpy as np
from time import sleep
from shutil import copyfile
import random
from scipy import stats

In [2]:
%matplotlib inline
pd.options.display.max_columns = 999
pd.options.display.max_rows = 999

In [3]:
# # To find out where the pyspark
# import findspark
# findspark.init()

# # Creating Spark Context
# from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import StringType, IntegerType, DecimalType, DoubleType

In [4]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, CountVectorizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [5]:
"""SimpleApp.py"""
spark = SparkSession.builder.appName("FirstSparkApp").getOrCreate()

In [51]:
df = spark.read\
    .format("com.databricks.spark.csv")\
    .option("header", "true") \
    .option("mode", "DROPMALFORMED")\
    .option("inferSchema", "true")\
    .load("/home/arditto_trianggada3/Workspace/ds-ovo-test/dataset/trip_merge_4_fe_all.csv")\
    .filter("pickup_datetime < '2013-04-08 '")

In [52]:
df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_longitude_bin: double (nullable 

In [53]:
df = df.withColumn("pickup_date",F.expr("to_date(pickup_datetime)"))

In [54]:
df.createOrReplaceTempView("tbl_df")

In [71]:
df_agg = spark.sql("""
SELECT
    t0.*,
    (unix_timestamp(pickup_datetime) - unix_timestamp(prev_dropoff_datetime)) pickup_time_in_secs,
    SQRT(POW(pickup_longitude-prev_dropoff_longitude,2) + POW(pickup_latitude-prev_dropoff_latitude,2)) pickup_dist_euclidean
    
FROM (
    SELECT
        hack_license,

        passenger_count,
        trip_time_in_secs,
        trip_distance,
        pickup_longitude,
        pickup_latitude,
        dropoff_longitude,
        dropoff_latitude,
        fare_amount,
        tip_amount,
        surcharge,
        tolls_amount,
        pickup_addr_zipcode,
        pickup_addr_sublocal,
        dropoff_addr_zipcode,
        dropoff_addr_sublocal,
        pickup_to_dropoff_zipcode,
        pickup_to_dropoff_sublocal,
        pickup_dayofweek,
        to_date(pickup_datetime) pickup_date,
        pickup_hour,
        dropoff_dayofweek,
        dropoff_hour,
        trip_late_night,
        trip_night,
        trip_dist_euclidean,

        pickup_datetime,
        dropoff_datetime,

        --LAG(pickup_longitude) OVER (PARTITION BY hack_license,pickup_date ORDER BY pickup_datetime) prev_pickup_longitude,
        --LAG(pickup_latitude) OVER (PARTITION BY hack_license,pickup_date ORDER BY pickup_datetime) prev_pickup_latitude,
        LAG(dropoff_longitude) OVER (PARTITION BY hack_license,pickup_date ORDER BY pickup_datetime) prev_dropoff_longitude,
        LAG(dropoff_latitude) OVER (PARTITION BY hack_license,pickup_date ORDER BY pickup_datetime) prev_dropoff_latitude,
        
        --LAG(pickup_datetime) OVER (PARTITION BY hack_license,pickup_date ORDER BY pickup_datetime) prev_pickup_datetime,
        LAG(dropoff_datetime) OVER (PARTITION BY hack_license,pickup_date ORDER BY pickup_datetime) prev_dropoff_datetime

        
    FROM tbl_df
) t0

""")

In [72]:
df_agg.createOrReplaceTempView("tbl_df_agg")

In [83]:
df_agg1 = spark.sql("""
    SELECT
        hack_license,pickup_date,
        
        SUM(1) trip_num,
        
        --MIN(passenger_count) passenger_count_min,
        --PERCENTILE_APPROX(passenger_count,0.5) passenger_count_median,
        MAX(passenger_count) passenger_count_max,
        
        MIN(pickup_longitude) pickup_longitude_min,
        MIN(pickup_latitude) pickup_latitude_min,
        MAX(pickup_longitude) pickup_longitude_max,
        MAX(pickup_latitude) pickup_latitude_max,
        
        MIN(dropoff_longitude) dropoff_longitude_min,
        MIN(dropoff_latitude) dropoff_latitude_min,
        MAX(dropoff_longitude) dropoff_longitude_max,
        MAX(dropoff_latitude) dropoff_latitude_max,
        
        MIN(fare_amount) fare_amount_min,
        
        SUM(CASE WHEN tip_amount>0 THEN 1 ELSE 0 END) tip_amount_count,
        
        SUM(CASE WHEN surcharge>0 THEN 1 ELSE 0 END) surcharge_count,
        
        --CAST(COLLECT_LIST(pickup_to_dropoff_local) AS string) pickup_to_dropoff_local_list,
        --CAST(COLLECT_SET(pickup_to_dropoff_local) AS string) pickup_to_dropoff_local_set,
        SUM(CASE WHEN pickup_addr_sublocal!=dropoff_addr_sublocal THEN 1 ELSE 0 END) trip_move_sublocal_count,
        
        --SUM(CASE WHEN pickup_hour==0  THEN 1 ELSE 0 END) pickup_hour_0 ,
        --SUM(CASE WHEN pickup_hour==1  THEN 1 ELSE 0 END) pickup_hour_1 ,
        --SUM(CASE WHEN pickup_hour==2  THEN 1 ELSE 0 END) pickup_hour_2 ,
        --SUM(CASE WHEN pickup_hour==3  THEN 1 ELSE 0 END) pickup_hour_3 ,
        --SUM(CASE WHEN pickup_hour==4  THEN 1 ELSE 0 END) pickup_hour_4 ,
        --SUM(CASE WHEN pickup_hour==5  THEN 1 ELSE 0 END) pickup_hour_5 ,
        --SUM(CASE WHEN pickup_hour==6  THEN 1 ELSE 0 END) pickup_hour_6 ,
        --SUM(CASE WHEN pickup_hour==7  THEN 1 ELSE 0 END) pickup_hour_7 ,
        --SUM(CASE WHEN pickup_hour==8  THEN 1 ELSE 0 END) pickup_hour_8 ,
        --SUM(CASE WHEN pickup_hour==9  THEN 1 ELSE 0 END) pickup_hour_9 ,
        --SUM(CASE WHEN pickup_hour==10 THEN 1 ELSE 0 END) pickup_hour_10,
        --SUM(CASE WHEN pickup_hour==11 THEN 1 ELSE 0 END) pickup_hour_11,
        --SUM(CASE WHEN pickup_hour==12 THEN 1 ELSE 0 END) pickup_hour_12,
        --SUM(CASE WHEN pickup_hour==13 THEN 1 ELSE 0 END) pickup_hour_13,
        --SUM(CASE WHEN pickup_hour==14 THEN 1 ELSE 0 END) pickup_hour_14,
        --SUM(CASE WHEN pickup_hour==15 THEN 1 ELSE 0 END) pickup_hour_15,
        --SUM(CASE WHEN pickup_hour==17 THEN 1 ELSE 0 END) pickup_hour_17,
        --SUM(CASE WHEN pickup_hour==18 THEN 1 ELSE 0 END) pickup_hour_18,
        --SUM(CASE WHEN pickup_hour==19 THEN 1 ELSE 0 END) pickup_hour_19,
        --SUM(CASE WHEN pickup_hour==20 THEN 1 ELSE 0 END) pickup_hour_20,
        --SUM(CASE WHEN pickup_hour==21 THEN 1 ELSE 0 END) pickup_hour_21,
        --SUM(CASE WHEN pickup_hour==22 THEN 1 ELSE 0 END) pickup_hour_22,
        --SUM(CASE WHEN pickup_hour==23 THEN 1 ELSE 0 END) pickup_hour_23,
        MIN(pickup_hour) pickup_hour_min,
        PERCENTILE_APPROX(pickup_hour,0.5) pickup_hour_median,
        MAX(pickup_hour) pickup_hour_max,
        
        SUM(trip_late_night) trip_late_night_sum,
        SUM(trip_night) trip_night_sum,
        
        --MIN(trip_time_in_secs) trip_time_in_secs_min,
        --PERCENTILE_APPROX(trip_time_in_secs,0.5) trip_time_in_secs_median,
        --MAX(trip_time_in_secs) trip_time_in_secs_max,
        --SUM(trip_time_in_secs) trip_time_in_secs_sum,
        
        MIN(trip_dist_euclidean) trip_dist_euclidean_min,
        --PERCENTILE_APPROX(trip_dist_euclidean,0.5) trip_dist_euclidean_median,
        MAX(trip_dist_euclidean) trip_dist_euclidean_max,
        --SUM(trip_dist_euclidean) trip_dist_euclidean_sum,
        
        MIN(pickup_time_in_secs) pickup_time_in_secs_min,
        --PERCENTILE_APPROX(pickup_time_in_secs,0.5) pickup_time_in_secs_median,
        MAX(pickup_time_in_secs) pickup_time_in_secs_max,
        --SUM(pickup_time_in_secs) pickup_time_in_secs_sum,
        
        MIN(pickup_dist_euclidean) pickup_dist_euclidean_min,
        --PERCENTILE_APPROX(pickup_dist_euclidean,0.5) pickup_dist_euclidean_median,
        MAX(pickup_dist_euclidean) pickup_dist_euclidean_max,
        --SUM(pickup_dist_euclidean) pickup_dist_euclidean_sum,
        
        --(unix_timestamp(LAST(pickup_datetime)) - unix_timestamp(FIRST(pickup_datetime))) work_time_in_secs,
        (unix_timestamp(MAX(GREATEST(pickup_datetime,dropoff_datetime))) - unix_timestamp(MIN(LEAST(pickup_datetime,dropoff_datetime)))) work_time_in_secs,
        
        SUM(surcharge) surcharge_sum,
        SUM(tip_amount) tip_amount_sum,
        SUM(fare_amount) fare_amount_sum
        
    FROM tbl_df_agg
    GROUP BY hack_license,pickup_date

""")

In [84]:
df_agg1.createOrReplaceTempView("tbl_df_agg1")

In [85]:
df_agg1 = df_agg1.withColumn('earning_sum',F.expr("surcharge_sum + tip_amount_sum + fare_amount_sum"))

In [86]:
df_agg1.createOrReplaceTempView("tbl_df_agg1")

In [87]:
df_agg1.write\
    .option('overwrite','true')\
    .option('header','true')\
    .csv('/home/arditto_trianggada3/Workspace/ds-ovo-test/dataset/spark_df/df_agg2')

In [88]:
df_agg1 = spark.read\
    .format("com.databricks.spark.csv")\
    .option("header", "true") \
    .option("mode", "DROPMALFORMED")\
    .option("inferSchema", "true")\
    .load('/home/arditto_trianggada3/Workspace/ds-ovo-test/dataset/spark_df/df_agg2')\
    .limit(10000)

In [89]:
df1 = df_agg1.toPandas()

In [90]:
df1.to_csv("./dataset/trip_agg_hack_license_daily_v2.csv",index=False)

In [91]:
# df1 = df_agg1.limit(10).toPandas()
df1.head(10)

Unnamed: 0,hack_license,pickup_date,trip_num,passenger_count_max,pickup_longitude_min,pickup_latitude_min,pickup_longitude_max,pickup_latitude_max,dropoff_longitude_min,dropoff_latitude_min,dropoff_longitude_max,dropoff_latitude_max,fare_amount_min,tip_amount_count,surcharge_count,trip_move_sublocal_count,pickup_hour_min,pickup_hour_median,pickup_hour_max,trip_late_night_sum,trip_night_sum,trip_dist_euclidean_min,trip_dist_euclidean_max,pickup_time_in_secs_min,pickup_time_in_secs_max,pickup_dist_euclidean_min,pickup_dist_euclidean_max,work_time_in_secs,surcharge_sum,tip_amount_sum,fare_amount_sum,earning_sum
0,0025133AD810DBE80D35FCA8BF0BCA1F,2013-04-06,27,1,-74.004913,40.713531,-73.944969,40.802586,-74.002441,40.695534,-73.870422,40.830936,4.0,12,20,5,0,19,23,20,0,0.006657,0.098976,60.0,48840.0,0.000163,0.114002,87000,10.0,34.36,304.5,348.86
1,00725588681CA8E11EE0C48BC9CD3896,2013-04-04,23,3,-74.012558,40.702454,-73.945564,40.797276,-74.010422,40.709656,-73.807831,41.032913,4.5,12,7,1,4,12,16,3,4,0.007512,0.32393,50.0,7349.0,4.2e-05,0.286032,42450,5.5,30.45,343.5,379.45
2,00A4FE56E610A60BB4488B38DC65B8FF,2013-04-01,20,1,-74.004311,40.700798,-73.863113,40.77005,-74.014465,40.693867,-73.795456,40.791622,5.0,13,20,4,17,20,23,12,12,0.008245,0.126271,32.0,2294.0,4.7e-05,0.140788,24117,14.0,27.77,242.0,283.77
3,00A84F2983BCE93E943137C590D631D7,2013-04-01,24,1,-74.002213,40.731197,-73.915504,40.818367,-74.007339,40.693329,-73.907112,40.819614,3.0,10,24,5,0,19,23,21,7,0.003679,0.07389,75.0,50764.0,0.000691,0.081563,86307,13.5,27.17,241.0,281.67
4,00D0B6CE0ADA00D70908CC55C343481F,2013-04-01,7,1,-73.988739,40.643898,-73.781921,40.745483,-73.992966,40.672836,-73.798897,40.768517,4.0,2,6,3,16,21,23,4,3,0.002784,0.230963,60.0,5520.0,0.02332,0.220407,27180,4.5,17.5,203.0,225.0
5,012FBB14A2AC8FD4E2AE84BDC763A0CC,2013-04-02,24,3,-74.005516,40.722889,-73.933563,40.797409,-74.002731,40.656509,-73.936722,40.794693,4.5,11,24,6,0,20,23,18,9,0.006626,0.140952,93.0,58709.0,0.000329,0.074553,86313,15.0,24.36,283.5,322.86
6,01306A5392F70DF87BE7A9AAB2CCFB37,2013-04-03,26,1,-74.015373,40.714596,-73.945801,40.783146,-74.013809,40.714046,-73.943428,40.789948,3.5,8,1,0,7,12,16,0,1,0.005217,0.086357,60.0,1980.0,0.000462,0.018837,32160,1.0,15.35,275.5,291.85
7,019AD8F058B732DDDADA59F3A17DA556,2013-04-03,17,1,-74.010757,40.709217,-73.936401,40.814129,-74.010674,40.709293,-73.865372,40.787189,4.0,10,17,1,17,21,23,13,8,0.003577,0.098259,13.0,7026.0,0.0,0.129021,22692,10.5,19.05,164.0,193.55
8,01B6FCB9F872D56949867EC1EF2F7640,2013-04-06,13,2,-74.004753,40.646832,-73.790344,40.796436,-74.003319,40.646832,-73.790344,40.798203,4.0,4,0,2,9,13,15,0,0,0.0,0.107002,120.0,3540.0,0.000211,0.217698,22680,0.0,7.15,201.0,208.15
9,01CAA844AF60BF37B6F261D1DF3A9E76,2013-04-07,15,4,-74.00061,40.734776,-73.862656,40.786591,-74.009537,40.693459,-73.828262,40.786587,4.5,6,0,5,6,10,15,2,0,0.006895,0.152125,60.0,4680.0,6.1e-05,0.153276,33600,0.0,19.02,274.0,293.02
