In [None]:
# !pip install pyspark
# !pip install findspark
# !pip install pyarrow==1.0.0
# !pip install pandas
# !pip install numpy==1.19.5

# !pip install cassandra-driver

In [None]:
import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import string
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from numpy import datetime64

def get_path(dataset_name,env_name='colab'):
    prefix = 'https://raw.githubusercontent.com/John-Ghaly88/Big_Data_and_NoSQL/main/Datasets/Assessment/'
    if env_name == 'colab':
        return prefix+dataset_name
    else:
        return f'../Datasets/{dataset_name}'

In [None]:
sc = SparkContext()

# Creating a spark session
spark = SparkSession.builder.appName("Python Spark DataFrames basic example") .config("spark.some.config.option", "some-value") .getOrCreate()

In [None]:
df = pd.read_csv(get_path('cleaned_taxi_trip.csv'))
i,j = df.shape

In [None]:
sdf = spark.createDataFrame(df) 

In [None]:
sdf.createTempView("taxis")

### The group by using cassandra only works on primary keys, so given that we need multiple group by's and oly one primary key is acceptable, i used spark instead

In [None]:
# session.execute("""SELECT time_of_day,payment_type,COUNT(*) as number_of_trips 
# FROM taxi.nyc_taxiii10 
# GROUP BY time_of_day,payment_type 
# """)

In [None]:
spark.sql(""" 
WITH s1 AS
(SELECT time_of_day, payment_type, COUNT(*) number_of_trips 
FROM taxis 
GROUP BY time_of_day, payment_type 
ORDER BY number_of_trips DESC),
s2 AS(
  SELECT MAX(s1.number_of_trips) number_of_trips
  FROM s1
  GROUP BY s1.time_of_day
)
SELECT time_of_day, payment_type, number_of_trips
FROM s1 INNER JOIN s2 using(number_of_trips)
WHERE s1.number_of_trips=s2.number_of_trips
""").show()

+-----------+------------+---------------+
|time_of_day|payment_type|number_of_trips|
+-----------+------------+---------------+
|    morning|           1|           3441|
|    evening|           1|           3298|
|  afternoon|           1|           3346|
|      night|           1|           3295|
+-----------+------------+---------------+



In [None]:
# session.execute("select passenger_count,AVG(tip_amount) as average_tip_amount from taxi.nyc_taxiii10 GROUP BY passenger_count")

In [None]:
spark.sql("SELECT passenger_count, ROUND(AVG(tip_amount), 2) average_tip_amount FROM taxis GROUP BY passenger_count ORDER BY average_tip_amount DESC").show()

+---------------+------------------+
|passenger_count|average_tip_amount|
+---------------+------------------+
|              4|              1.92|
|              6|              1.89|
|              5|              1.83|
|              1|              1.82|
|              2|               1.8|
|              3|              1.77|
+---------------+------------------+



In [None]:
# session.execute("select zone_id,zone_name,COUNT(*) as number_of_trips from taxi.nyc_taxiii10 GROUP BY zone_id,zone_name ")

In [None]:
spark.sql("SELECT zone_id,zone_name, COUNT(*) number_of_trips FROM taxis GROUP BY zone_id,zone_name ORDER BY number_of_trips DESC LIMIT 5").show()

+-------+--------------------+---------------+
|zone_id|           zone_name|number_of_trips|
+-------+--------------------+---------------+
|    237|Upper East Side S...|            793|
|    161|      Midtown Center|            745|
|    236|Upper East Side N...|            715|
|    230|Times Sq/Theatre ...|            693|
|    162|        Midtown East|            669|
+-------+--------------------+---------------+



### Adding the total amount to dataframe using spark.

In [None]:
sdf=spark.sql("SELECT *,ROUND((fare_amount+extra+mta_tax+tip_amount+tolls_amount+imp_surcharge),2) total_amount FROM taxis")

In [None]:
# sdf.show()

In [None]:
cloud_config= {'secure_connect_bundle': 'secure-connect-taxi2.zip'}
auth_provider = PlainTextAuthProvider('rJKAmyrlDtBYgFQdXUWZDDIS', '5cTRfvL6n-wZu3Ky1XoDnm4sEZ3SiLj.0Brw.NlJdQCEtbNdca7XYYAN6fc9oZ_PgpMbDjWbu3fQrpu49-rfwZ5e2Ys,DMrNnW0kR2H4NM4z8nwj9NY3pQsZBo5iZIh1') 
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

In [None]:
session.execute("use taxi")

<cassandra.cluster.ResultSet at 0x7f24be786110>

In [None]:
session.execute("""
    create table if not exists taxi.nyc_tax (
        vendor_id text,
        pickup_datetime text,
        dropoff_datetime text,
        passenger_count int,
        trip_distance float,
        payment_type text,
        fare_amount float,
        extra float,
        mta_tax float,
        tip_amount float,
        tolls_amount float ,
        imp_surcharge float,
        zone_id text,
        dropofflocationid text,
        zone_name text,
        borough text,
        duration float,
        time_of_day text,
        id text,
        Primary key (id)
 	);
""")

<cassandra.cluster.ResultSet at 0x7f24b413a310>

In [None]:
df = sdf.toPandas()

In [None]:
for x in range(0,i-19000):
    session.execute("insert into taxi.nyc_tax(vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,imp_surcharge,zone_id \
    ,dropofflocationid,zone_name,borough,duration,time_of_day,id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);", \
    (str(df.loc[x].vendor_id), str(df.loc[x].pickup_datetime), str(df.loc[x].dropoff_datetime), int(df.loc[x].passenger_count), float(df.loc[x].trip_distance),str(df.loc[x].payment_type), \
    float(df.loc[x].fare_amount),float(df.loc[x].extra),float(df.loc[x].mta_tax),float(df.loc[x].tip_amount),float(df.loc[x].tolls_amount),float(df.loc[x].imp_surcharge), \
    str(df.loc[x].zone_id),str(df.loc[x].dropoff_location_id),str(df.loc[x].zone_name),str(df.loc[x].borough),int(df.loc[x].duration),str(df.loc[x].time_of_day),str(x)))

### Another way of adding total amount column using cassandra

In [None]:
session.execute("alter table taxi.nyc_tax add Total float;")

<cassandra.cluster.ResultSet at 0x7f24b628eb90>

In [None]:
table= session.execute(" select * from taxi.nyc_tax;")
counter = 0 
total = []
for j in table:
        counter = counter + 1
        total = (j[6]+ j[7]+ j[8]+ j[5]+ int(j[13]) + int(j[14]))
        session.execute("update taxi.nyc_tax set Total = {0} where id='{1}';".format(total,j[0]))