# Challenge 4 - Taxi Journeys - outliers
> Can you find the taxi journeys that have longest and shortest duration, distance travelled and total fare? <br>
> Are there any outliers in that data? <br> 
> Can you measure/detect the outliers? <br>

investigate outliers using PCA

In [0]:
%sql
DESCRIBE taxi_journeys

col_name,data_type,comment
VendorID,string,
tpep_pickup_datetime,timestamp,
tpep_dropoff_datetime,timestamp,
passenger_count,int,
trip_distance,double,
RatecodeID,string,
store_and_fwd_flag,string,
PULocationID,string,
DOLocationID,string,
payment_type,string,


In [0]:
%sql
SELECT COUNT(1) FROM taxi_journeys

count(1)
204051047


fit PCA features

In [0]:
%sql DROP TABLE IF EXISTS taxi_journey_features

In [0]:
%sql
CREATE TEMPORARY VIEW taxi_journey_features AS (
  SELECT idx,
         tpep_pickup_datetime,
         tpep_dropoff_datetime,
         trip_distance,
         fare_amount,
         (UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / 60 AS trip_duration_m,
         ROUND(trip_distance / ((UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / (60 * 60)), 2) AS mph,
         ROUND(fare_amount / ((UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / (60 * 60)), 2) AS dph
    FROM taxi_journeys
    WHERE trip_distance > 0  --filter 0s
    AND   fare_amount > 0
    AND   UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime) > 0
    AND   UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime) < 12 * 60 * 60 -- less than 12h
    AND   trip_distance / ((UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / (60 * 60)) < 70 -- less than 70mph
    AND   fare_amount / ((UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / (60 * 60)) < 600 -- less than 600 dollars /h
)

In [0]:
from pyspark.ml.feature import PCA, StandardScaler, VectorAssembler
import pyspark.ml.pipeline as p

feature_cols = ['trip_distance', 'trip_duration_m', 'fare_amount', 'mph', 'dph']
vector_assembler = VectorAssembler(handleInvalid='skip', inputCols=feature_cols, outputCol='vector')
standard_scalar = StandardScaler().setInputCol('vector').setOutputCol('normalized_vector').setWithMean(True).setWithStd(True)
pca = PCA().setInputCol('normalized_vector').setOutputCol('pc').setK(2)
pipeline = p.Pipeline().setStages([vector_assembler, standard_scalar, pca])
taxi_df = spark.sql('SELECT * FROM taxi_journey_features')
pca_df = pipeline.fit(taxi_df).transform(taxi_df)

In [0]:
display(pca_df.limit(10))

idx,tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,trip_duration_m,mph,dph,vector,normalized_vector,pc
1,2001-01-01T00:01:48.000+0000,2001-01-01T00:15:47.000+0000,1.35,9.0,13.983333333333333,5.79,38.62,"Map(vectorType -> dense, length -> 5, values -> List(1.35, 13.983333333333333, 9.0, 5.79, 38.62))","Map(vectorType -> dense, length -> 5, values -> List(-0.42493134257817067, -0.03933423502155462, -0.35296035161924394, -0.9000912741163977, -0.9357804514105702))","Map(vectorType -> dense, length -> 2, values -> List(0.9167737377615146, 1.066626264001402))"
2,2001-01-01T00:02:08.000+0000,2001-01-01T01:00:02.000+0000,0.43,3.5,57.9,0.45,3.63,"Map(vectorType -> dense, length -> 5, values -> List(0.43, 57.9, 3.5, 0.45, 3.63))","Map(vectorType -> dense, length -> 5, values -> List(-0.6642706241962439, 3.4252821982083352, -0.8420093123957197, -1.7658127696780266, -2.5837290605483925))","Map(vectorType -> dense, length -> 2, values -> List(0.2986253502223999, 4.147608382501832))"
3,2001-01-01T00:02:26.000+0000,2001-01-01T00:04:49.000+0000,0.36,3.5,2.3833333333333333,9.06,88.11,"Map(vectorType -> dense, length -> 5, values -> List(0.36, 2.3833333333333333, 3.5, 9.06, 88.11))","Map(vectorType -> dense, length -> 5, values -> List(-0.6824812217106627, -0.9544663175748765, -0.8420093123957197, -0.3699584481825912, 1.3950848434231207))","Map(vectorType -> dense, length -> 2, values -> List(1.2801549290192815, -1.3932330733354634))"
4,2001-01-01T00:05:12.000+0000,2001-01-01T00:09:59.000+0000,0.62,5.0,4.783333333333333,7.78,62.72,"Map(vectorType -> dense, length -> 5, values -> List(0.62, 4.783333333333333, 5.0, 7.78, 62.72))","Map(vectorType -> dense, length -> 5, values -> List(-0.6148418595142506, -0.7651286453224652, -0.7086323230930445, -0.5774722149089742, 0.19927417791842458))","Map(vectorType -> dense, length -> 2, values -> List(1.2942570281578905, -0.3006751471257483))"
6,2001-01-01T00:07:04.000+0000,2001-01-01T00:07:30.000+0000,0.11,2.5,0.4333333333333333,15.23,346.15,"Map(vectorType -> dense, length -> 5, values -> List(0.11, 0.43333333333333335, 2.5, 15.23, 346.15))","Map(vectorType -> dense, length -> 5, values -> List(-0.7475190699764435, -1.1083031762799609, -0.9309273052641699, 0.6303227554906765, 13.548175986661867))","Map(vectorType -> dense, length -> 2, values -> List(-0.25684836858407323, -11.00827363523916))"
7,2001-01-01T00:09:39.000+0000,2001-01-01T06:39:54.000+0000,3.22,13.5,390.25,0.5,2.08,"Map(vectorType -> dense, length -> 5, values -> List(3.22, 390.25, 13.5, 0.5, 2.08))","Map(vectorType -> dense, length -> 5, values -> List(0.0615517624498697, 29.64460527032873, 0.04717061628878175, -1.7577067631652772, -2.656730499364158))","Map(vectorType -> dense, length -> 2, values -> List(-12.323906408597068, 15.654925707298473))"
8,2001-01-01T00:13:42.000+0000,2001-01-01T00:22:17.000+0000,0.8,5.5,8.583333333333334,5.59,38.45,"Map(vectorType -> dense, length -> 5, values -> List(0.8, 8.583333333333334, 5.5, 5.59, 38.45))","Map(vectorType -> dense, length -> 5, values -> List(-0.5680146087628885, -0.46534399758948025, -0.6641733266588195, -0.932515300167395, -0.9437870608290732))","Map(vectorType -> dense, length -> 2, values -> List(1.3742038591955001, 0.8612792181396982))"
9,2001-01-01T22:55:43.000+0000,2001-01-02T00:05:43.000+0000,4.67,31.5,70.0,4.0,27.0,"Map(vectorType -> dense, length -> 5, values -> List(4.67, 70.0, 31.5, 4.0, 27.0))","Map(vectorType -> dense, length -> 5, values -> List(0.43877128239139823, 4.379859629147576, 1.6476944879208844, -1.1902863072728238, -1.4830557540165004))","Map(vectorType -> dense, length -> 2, values -> List(-2.4780769933780533, 3.8023565329171607))"
10,2001-01-05T11:45:23.000+0000,2001-01-05T11:52:05.000+0000,1.53,7.5,6.7,13.7,67.16,"Map(vectorType -> dense, length -> 5, values -> List(1.53, 6.7, 7.5, 13.7, 67.16))","Map(vectorType -> dense, length -> 5, values -> List(-0.37810409182680854, -0.6139214765097754, -0.48633734092191916, 0.38227895620054675, 0.4083879768487455))","Map(vectorType -> dense, length -> 2, values -> List(0.5585233347735316, -0.8295100559540818))"
13,2002-02-02T01:07:45.000+0000,2002-02-02T01:30:28.000+0000,3.84,17.0,22.716666666666665,10.14,44.9,"Map(vectorType -> dense, length -> 5, values -> List(3.84, 22.716666666666665, 17.0, 10.14, 44.9))","Map(vectorType -> dense, length -> 5, values -> List(0.222845626149006, 0.6496445167858315, 0.35838359132835723, -0.19486870750720559, -0.6400068799505665))","Map(vectorType -> dense, length -> 2, values -> List(-0.4677135159757784, 0.904351947975123))"


cols for pc components

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
to_array_udf = F.udf(lambda v: v.toArray().tolist(), T.ArrayType(T.FloatType()))
pc_df = pca_df.withColumn('pc_array', to_array_udf('pc'))
pc_df = pc_df.withColumn('pc1', F.udf(lambda v: v[0], T.FloatType())('pc_array'))
pc_df = pc_df.withColumn('pc2', F.udf(lambda v: v[1], T.FloatType())('pc_array'))

In [0]:
pc_pd = pc_df.select('pc1', 'pc2').toPandas()

In [0]:
pc_pd.plot(x='pc1', y='pc2', kind='scatter')

In [0]:
%sh
shutdown now