In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession,SQLContext
from pyspark import SparkContext
spark = SparkSession.builder.appName("Taxi_project").getOrCreate()
sc=SparkContext.getOrCreate()

In [3]:
from pyspark.sql import SQLContext
sqlcontext = SQLContext(spark)

In [4]:

file_loc="taxi-data-sorted-small.csv"
lines=sc.textFile(file_loc)
taxilines = lines.map(lambda x:x.split(","))

#exception handling and removing wrong data lines 

def isfloat(value):
    try:
        float(value)
        return True
    except:
        return False
    
# for exmaple, remove lines if they dont have 16 values and 
def correctRows(p):
    if (len(p)==17):
        if isfloat(p[5] and isfloat(p[11])):
            if(float(p[5])!=0 and float(p[11])!=0):
                return p
#cleaning up data
texilinesCorrected = taxilines.filter(correctRows)

In [5]:
taxi_clean_data = texilinesCorrected.toDF(['Taxi_id', 'Driver_id', 'Pickup_date_time', 'Dropoff_date_time', 'trip_time_in_sec', 'trip_dist', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'payment_type','fare_amount','surcharge','mta_tax','tip_amount','tolls_amount','total_amount'])

In [48]:
sqlcontext.sql("select * from clean_data").show()

+--------------------+--------------------+-------------------+-------------------+----------------+---------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|             Taxi_id|           Driver_id|   Pickup_date_time|  Dropoff_date_time|trip_time_in_sec|trip_dist|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+-------------------+-------------------+----------------+---------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|             120|     0.44|      -73.956528|      40.716976|       -73.962440|       40.715008|         CSH|       3.50|     0.50|   0.50|  

In [6]:
print(taxi_clean_data.count())
print(len(taxi_clean_data.columns))

1990270
17


In [7]:
#drop NA values
clean_data = taxi_clean_data.dropna()

In [8]:
print(taxi_clean_data.count())
print(len(taxi_clean_data.columns))

1990270
17


In [9]:
clean_data.registerTempTable("clean_data")

In [25]:
taxi_clean_data.printSchema()

root
 |-- Taxi_id: string (nullable = true)
 |-- Driver_id: string (nullable = true)
 |-- Pickup_date_time: string (nullable = true)
 |-- Dropoff_date_time: string (nullable = true)
 |-- trip_time_in_sec: string (nullable = true)
 |-- trip_dist: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [46]:
# Task 1 TOP 10 TAXIS THAT THAT HAVE HAD THE LARGEST NUMBER OF DRIVERS. 
sqlcontext.sql("SELECT Taxi_id , count('Driver_id') AS number_of_driver from clean_data group by Taxi_id order by count('Driver_id') desc limit 10;").show(10)

+--------------------+----------------+
|             Taxi_id|number_of_driver|
+--------------------+----------------+
|6FFCF7A4F34BA4423...|             585|
|D5C7CD37EA4D372D0...|             569|
|849E4868258601064...|             558|
|DA1A4CB0E75444C73...|             550|
|A979CDA04CFB8BA3D...|             549|
|A532B1493C4DD88C4...|             544|
|818B2426C5493017D...|             539|
|075E4BFE660742128...|             534|
|FF40FB8123940D9F9...|             533|
|4DBFC74756F934CC9...|             531|
+--------------------+----------------+



In [45]:
#Task 2 TOP 10 BEST DRIVER ARE IN TERM OF THEIR AVERAGE EARNED MONEY PER MINUTE
sqlcontext.sql("select Driver_id, (AVG(total_amount)/(AVG(trip_time_in_sec)/60)) AS avg_money_earned_per_min from clean_data group by Driver_id order by (AVG(total_amount)/(AVG(trip_time_in_sec)/60)) desc limit 10;").show()

+--------------------+------------------------+
|           Driver_id|avg_money_earned_per_min|
+--------------------+------------------------+
|30B2ACBAF00230553...|                   702.0|
|4C3B2A31227663A59...|                   625.0|
|08026D69508127F4D...|                   375.0|
|6E1D7195E38AA7A36...|       317.3076923076923|
|E8E22AC46DF6AC99C...|                   273.0|
|975D5E840C0F5D961...|      190.90909090909093|
|17F72121B9F612D81...|      107.14285714285714|
|619BF4020E6542AA0...|      105.88235294117648|
|CD9D0B4429613F1B6...|      103.25172413793103|
|52C99F4F8CD2560F8...|       89.28571428571429|
+--------------------+------------------------+



In [42]:
#Task 3 Time of the day that has the highest profit ratio. 
sqlcontext.sql("SELECT hour(Pickup_date_time) AS TIME, SUM(surcharge) AS Total_surcharge_amount, SUM(trip_dist) AS Total_travel_distance,(SUM(surcharge)/SUM(trip_dist)) AS Profit_ratio from clean_data group by hour(Pickup_date_time) order by (SUM(surcharge)/SUM(trip_dist)) desc;").show(30)

+----+----------------------+---------------------+--------------------+
|TIME|Total_surcharge_amount|Total_travel_distance|        Profit_ratio|
+----+----------------------+---------------------+--------------------+
|  19|               93973.5|   335863.16000000015|  0.2797969863679004|
|  18|               92658.5|   339670.96999999945|  0.2727889875310809|
|  17|               74738.5|    309579.8500000004|  0.2414191362906853|
|  16|               59141.0|   273484.30000000016| 0.21625007358740508|
|  20|               56698.5|    326477.0599999999|  0.1736676383939503|
|  21|               53744.5|   323382.40999999974| 0.16619487745174527|
|  22|               50954.5|    312854.0799999998|  0.1628698593286686|
|  23|               39627.5|            257190.42| 0.15407844506805501|
|   2|               18976.5|   124523.37999999989|  0.1523930686751357|
|   0|               32440.5|            216450.19| 0.14987512831474067|
|   1|               23359.5|   157744.13000000024|