In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.conf import SparkConf
from pyspark.sql.functions import month, desc, to_date, when, struct, avg, lit, max, date_format, hour, rank, dayofweek
from functools import reduce
import numpy as np
import pandas as pd
import time
import os
from IPython.display import clear_output, display
import warnings
warnings.filterwarnings('ignore')

In [2]:
#change here master IPv4 address if needed
spark = SparkSession.builder.appName("AdvancedDB_Project")\
                            .master("spark://192.168.0.1:7077").getOrCreate()

23/01/12 13:08:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [14]:
def print_dataframe(df, n, title) :
    df = df.limit(n).toPandas()
    
    df.index = np.arange(1, len(df) + 1)
    df = df.style.set_table_styles([dict(selector='th', props=[('text-align', 'center'), ('padding', '10px')]),\
                                    dict(selector='td', props=[('text-align', 'center'), ('padding', '10px')]),\
                                    dict(selector="caption", props=[("text-align", "left"), ("font-size", "120%"), ("color", "#D35C5C")])])\
                .format(precision=3, thousands=",")\
                .set_caption(title)
    clear_output()
    display(df)

<h1 style="color:#4668BC"><u>Ερώτημα 1</u></h1>
<p style="text-align:justify">Εγκαταστήστε την πλατφόρμα εκτέλεσης Spark & HDFS (5%) και δημιουργήστε δύο RDD
και δύο DataFrames από τα αρχικά δεδομένα (taxi trips & zone lookups).</p>

In [4]:
df1 = spark.read.option("header", "true").format("parquet").load('hdfs://master:9000/Dataset/*.parquet')
df2 = spark.read.option("header", "true").format("csv").load('hdfs://master:9000/Dataset/taxi+_zone_lookup.csv')
clear_output()

In [5]:
df1 = df1.filter(to_date(df1["tpep_pickup_datetime"]).between("2022-01-01", "2022-06-30"))

In [6]:
rdd1 = df1.rdd
rdd2 = df2.rdd

In [7]:
df1_month = df1.withColumn("month", month(df1["tpep_pickup_datetime"]))

<h1 style="color:#4668BC"><u>Ερώτημα 2</u></h1>
<p style="text-align:justify">Εκτελέστε τα <code>Q1</code>, <code>Q2</code> χρησιμοποιώντας το DataFrame/SQL API. Θέλουμε τα αποτελέσματα 
και τους χρόνους εκτέλεσης του ερωτήματος με χρήση 1 και 2 workers (και όλες τις 
διαθέσιμες CPUs). Για να λάβετε σωστά τους χρόνους εκτέλεσης, φροντίστε να κάνετε
collect το αποτέλεσμα του κάθε query (ή γράψιμο στο hdfs-δίσκο) </p>
<p style="text-align:justify"><code>Q1:</code> Να βρεθεί η διαδρομή με το μεγαλύτερο φιλοδώρημα (tip) τον Μάρτιο και σημείο 
άφιξης το "Battery Park".</p>

In [8]:
start_time = time.time()

df_Q1 = df1.filter(to_date(df1["tpep_pickup_datetime"]).between("2022-03-01", "2022-03-31"))
df_Q1 = df_Q1.join(df2, df_Q1["DOLocationID"]==df2["LocationID"])
df_Q1 = df_Q1.filter(df_Q1["Zone"]=="Battery Park")

max_tip = df_Q1.agg(max("tip_amount").alias("max_tip")).first()["max_tip"]
df_Q1 = df_Q1.filter(df_Q1["tip_amount"] == max_tip)


result = df_Q1.collect()
elapsed_time = time.time() - start_time

print_dataframe(df_Q1, df_Q1.count(), "Query Q1")
print(f"Elapsed time: {elapsed_time:.2f} sec")

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,LocationID,Borough,Zone,service_zone
1,2,2022-03-17 12:27:47,2022-03-17 12:27:58,1.0,0.0,1.0,N,12,12,1,2.5,0.0,0.5,40.0,0.0,0.3,45.8,2.5,0.0,12,Manhattan,Battery Park,Yellow Zone


Elapsed time: 19.79 sec


<p style="text-align:justify"><code>Q2:</code> Να βρεθεί, για κάθε μήνα, η διαδρομή με το υψηλότερο ποσό στα διόδια. Αγνοήστε 
μηδενικά ποσά.</p>

In [9]:
start_time = time.time()

df_Q2 = df1_month.groupBy(df1_month["month"].alias("month_num")).agg(max("tolls_amount").alias("max_tolls_amount"))

df_Q2 = df_Q2.join(df1_month, (df_Q2["month_num"] == df1_month["month"]) & (df_Q2["max_tolls_amount"] == df1_month["tolls_amount"]))

df_Q2 = df_Q2.drop("tolls_amount", "month")

df_Q2 = df_Q2.orderBy("month_num")

month_mapping = {1:"January", 2:"February", 3:"March", 4:"April", 5:"May", 6:"June"}
month_mapping_conditions = [(df_Q2["month_num"] == i, month_mapping[i]) for i in range(1, 7)]
df_Q2 = df_Q2.withColumn("month_num", reduce(lambda acc, condition : when(condition[0], condition[1]).otherwise(acc), month_mapping_conditions, lit(None)))\
              .withColumnRenamed("month_num", "month")


result = df_Q2.collect()
elapsed_time = time.time() - start_time

print_dataframe(df_Q2, df_Q2.count(), "Query Q2")
print(f"Elapsed time: {elapsed_time:.2f} sec")

Unnamed: 0,month,max_tolls_amount,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,January,193.3,1,2022-01-22 11:39:07,2022-01-22 12:31:09,1.0,33.4,1.0,Y,70,265,4,88.0,0.0,0.5,0.0,0.3,282.1,0.0,0.0
2,February,95.0,1,2022-02-18 02:33:30,2022-02-18 02:35:28,1.0,1.3,1.0,N,265,265,1,3.0,0.5,0.5,19.85,0.3,119.15,0.0,0.0
3,March,235.7,1,2022-03-11 20:08:32,2022-03-11 20:09:45,1.0,0.0,1.0,N,265,265,1,2.5,1.0,0.5,48.0,0.3,288.0,0.0,0.0
4,April,911.87,1,2022-04-29 04:31:21,2022-04-29 04:32:30,2.0,0.0,1.0,N,249,249,3,3.0,3.0,0.5,0.0,0.3,918.67,2.5,0.0
5,May,813.75,1,2022-05-21 16:47:48,2022-05-21 17:05:47,1.0,2.4,3.0,N,239,246,3,31.5,0.0,0.0,0.0,0.3,845.55,0.0,0.0
6,June,800.09,1,2022-06-12 16:51:46,2022-06-12 17:56:48,9.0,22.0,1.0,N,142,132,2,67.5,2.5,0.5,0.0,0.3,870.89,2.5,0.0


Elapsed time: 30.45 sec


<h1 style="color:#4668BC"><u>Ερώτημα 3</u></h1>
<p style="text-align:justify">Εκτελέστε το <code>Q3</code> χρησιμοποιώντας το DataFrame/SQL API και το RDD API. Θέλουμε τα 
αποτελέσματα και τους χρόνους εκτέλεσης του ερωτήματος με χρήση 1 και 2 workers.</p>
<p style="text-align:justify"><code>Q3</code>: Να βρεθεί, ανά 15 ημέρες, ο μέσος όρος της απόστασης και του κόστους για όλες τις 
διαδρομές με σημείο αναχώρησης διαφορετικό από το σημείο άφιξης.</p>

In [10]:
start_time = time.time()

dates = sorted([("2022-0"+str(i)+"-01", "2022-0"+str(i)+"-15") for i in range(1,7)] + \
        [("2022-02-16", "2022-02-28")] + \
        [("2022-0"+str(i)+"-16", "2022-0"+str(i)+"-31") for i in range(1,7,2)] + \
        [("2022-0"+str(i)+"-16", "2022-0"+str(i)+"-30") for i in range(4,7,2)])

date_conditions = [(to_date(df1["tpep_pickup_datetime"]).between(dates[i][0], dates[i][1]), "[" + dates[i][0] + ", " +  dates[i][1] + "]") for i in range(12)]

df_Q3 = df1.withColumn("fortnight", reduce(lambda acc, condition : when(condition[0], condition[1]).otherwise(acc), date_conditions, lit(None)))

df_Q3 = df_Q3.filter(df_Q3["PULocationID"] != df_Q3["DOLocationID"])

df_Q3 = df_Q3.groupby(df_Q3["fortnight"]).agg(avg("trip_distance").alias("avg distance"), avg("total_amount").alias("avg charge"))

df_Q3 = df_Q3.orderBy("fortnight")


result = df_Q3.collect()
elapsed_time = time.time() - start_time

print_dataframe(df_Q3, df_Q3.count(), "Query Q3")
print(f"Elapsed time: {elapsed_time:.2f} sec")

Unnamed: 0,fortnight,avg distance,avg charge
1,"[2022-01-01, 2022-01-15]",5.576,19.904
2,"[2022-01-16, 2022-01-31]",5.098,19.149
3,"[2022-02-01, 2022-02-15]",6.249,19.492
4,"[2022-02-16, 2022-02-28]",5.849,20.188
5,"[2022-03-01, 2022-03-15]",6.48,20.652
6,"[2022-03-16, 2022-03-31]",5.557,21.121
7,"[2022-04-01, 2022-04-15]",5.679,21.516
8,"[2022-04-16, 2022-04-30]",5.8,21.428
9,"[2022-05-01, 2022-05-15]",6.25,21.922
10,"[2022-05-16, 2022-05-31]",7.907,22.772


Elapsed time: 8.33 sec


<p style="text-align:justify"><code>Q3 (RDD)</code>: Να βρεθεί, ανά 15 ημέρες, ο μέσος όρος της απόστασης και του κόστους για όλες τις 
διαδρομές με σημείο αναχώρησης διαφορετικό από το σημείο άφιξης.</p>

In [11]:
def check_date(row) :
  date = row.tpep_pickup_datetime.date().strftime("%Y-%m-%d")
  for i in range(12) :
    if date >= dates[i][0] and date <= dates[i][1] :
      return ("[" + dates[i][0] + ", " +  dates[i][1] + "]", (row.trip_distance, row.total_amount, 1))


start_time = time.time()

rdd_Q3 = rdd1.filter(lambda x : x.PULocationID != x.DOLocationID).map(check_date)\
              .reduceByKey(lambda a,b:(a[0]+b[0], a[1]+b[1], a[2]+b[2]))\
              .mapValues(lambda a : (a[0]/a[2], a[1]/a[2]))\
              .sortBy(lambda a : a[0])\
              .map(lambda a : (a[0], a[1][0], a[1][1]))


result = rdd_Q3.collect()
elapsed_time = time.time() - start_time

df_Q3_from_rdd = rdd_Q3.toDF(["fortnight", "avg distance", "avg charge"])
print_dataframe(df_Q3_from_rdd, df_Q3_from_rdd.count(), "Query Q3 (with RDD)")
print(f"Elapsed time: {elapsed_time:.2f} sec")

Unnamed: 0,fortnight,avg distance,avg charge
1,"[2022-01-01, 2022-01-15]",5.576,19.904
2,"[2022-01-16, 2022-01-31]",5.098,19.149
3,"[2022-02-01, 2022-02-15]",6.249,19.492
4,"[2022-02-16, 2022-02-28]",5.849,20.188
5,"[2022-03-01, 2022-03-15]",6.48,20.652
6,"[2022-03-16, 2022-03-31]",5.557,21.121
7,"[2022-04-01, 2022-04-15]",5.679,21.516
8,"[2022-04-16, 2022-04-30]",5.8,21.428
9,"[2022-05-01, 2022-05-15]",6.25,21.922
10,"[2022-05-16, 2022-05-31]",7.907,22.772


Elapsed time: 209.73 sec


<h1 style="color:#4668BC"><u>Ερώτημα 4</u></h1>
<p style="text-align:justify"> Εκτελέστε τα <code>Q4</code>, <code>Q5</code> χρησιμοποιώντας το DataFrame/SQL API. Θέλουμε τα αποτελέσματα και τους χρόνους εκτέλεσης του ερωτήματος με χρήση 1 και 2 workers.</p>
<p style="text-align:justify"><code>Q4</code>: Να βρεθούν οι τρεις μεγαλύτερες (top 3) ώρες αιχμής ανά ημέρα της εβδομάδος, 
εννοώντας τις ώρες (π.χ. 7-8πμ, 3-4μμ, κλπ) της ημέρας με τον μεγαλύτερο αριθμό 
επιβατών σε μια κούρσα ταξί. Ο υπολογισμός αφορά όλους τους μήνες.</p>

In [12]:
start_time = time.time()

hour_intervals = [(i, i+1) for i in range(24)]

hour_conditions = [((hour(df1["tpep_pickup_datetime"]) >= hour_intervals[i][0]) &
                    (hour(df1["tpep_pickup_datetime"]) < hour_intervals[i][1]), f"[{i}, {(i+1)%24})") for i in range(24)]

df_Q4 = df1.withColumn("day", dayofweek(df1["tpep_pickup_datetime"]))\
            .withColumn("hour_interval", reduce(lambda acc, condition : when(condition[0], condition[1]).otherwise(acc), hour_conditions, lit(None)))

df_Q4 = df_Q4.groupby(df_Q4["day"], df_Q4["hour_interval"]).agg(avg(df_Q4["passenger_count"]).alias("number_passengers/hour/day"))

window = Window.partitionBy("day").orderBy(desc("number_passengers/hour/day"))
df_Q4 = df_Q4.withColumn("rank", rank().over(window))

df_Q4 = df_Q4.filter(df_Q4["rank"] <= 3)

df_Q4 = df_Q4.orderBy([df_Q4["day"], df_Q4["rank"]])

day_mapping = {1:"Monday", 2:"Tuesday", 3:"Wednesday", 4:"Thursday", 5:"Friday", 6:"Saturday", 7:"Sunday"}
day_mapping_conditions = [(df_Q4["day"] == i, day_mapping[i]) for i in range(1, 8)]
df_Q4 = df_Q4.withColumn("day", reduce(lambda acc, condition : when(condition[0], condition[1]).otherwise(acc), day_mapping_conditions, lit(None)))\
              .drop(df_Q4["rank"])


result = df_Q4.collect()
elapsed_time = time.time() - start_time

print_dataframe(df_Q4, df_Q4.count(), "Query Q4")
print(f"Elapsed time: {elapsed_time:.2f} sec")

Unnamed: 0,day,hour_interval,number_passengers/hour/day
1,Monday,"[0, 1)",1.53
2,Monday,"[1, 2)",1.528
3,Monday,"[2, 3)",1.508
4,Tuesday,"[0, 1)",1.468
5,Tuesday,"[1, 2)",1.444
6,Tuesday,"[2, 3)",1.423
7,Wednesday,"[0, 1)",1.42
8,Wednesday,"[1, 2)",1.418
9,Wednesday,"[2, 3)",1.41
10,Thursday,"[1, 2)",1.409


Elapsed time: 11.19 sec


<p style="text-align:justify"><code>Q5</code>: Να βρεθούν οι κορυφαίες πέντε (top 5) ημέρες ανά μήνα στις οποίες οι κούρσες είχαν 
το μεγαλύτερο ποσοστό σε tip. Για παράδειγμα, εάν η κούρσα κόστισε 10\$ (fare_amount) και το tip ήταν 5$, το ποσοστό είναι 50%.</p>

In [13]:
start_time = time.time()

df_Q5 = df1_month.withColumn("% tip/fare", df1["tip_amount"]/df1["fare_amount"]*100)\
            .withColumn("date", to_date(df1["tpep_pickup_datetime"]))

df_Q5 = df_Q5.groupby([df_Q5["month"], df_Q5["date"]]).agg(max(df_Q5["% tip/fare"]).alias("max % tip/fare"))

window = Window.partitionBy("month").orderBy(desc("max % tip/fare"))
df_Q5 = df_Q5.withColumn("rank", rank().over(window))

df_Q5 = df_Q5.filter(df_Q5["rank"] <= 5)

df_Q5 = df_Q5.orderBy([df_Q5["month"], df_Q5["rank"]])

month_mapping = {1:"January", 2:"February", 3:"March", 4:"April", 5:"May", 6:"June"}
month_mapping_conditions = [(df_Q5["month"] == i, month_mapping[i]) for i in range(1, 7)]
df_Q5 = df_Q5.withColumn("month", reduce(lambda acc, condition : when(condition[0], condition[1]).otherwise(acc), month_mapping_conditions, lit(None)))\
              .drop(df_Q5["rank"])


result = df_Q5.collect()
elapsed_time = time.time() - start_time

print_dataframe(df_Q5, df_Q5.count(), "Query Q5")
print(f"Elapsed time: {elapsed_time:.2f} sec")

Unnamed: 0,month,date,max % tip/fare
1,January,2022-01-09,1688800.0
2,January,2022-01-31,1100000.0
3,January,2022-01-01,500000.0
4,January,2022-01-03,250000.0
5,January,2022-01-16,120000.0
6,February,2022-02-21,450000.0
7,February,2022-02-13,296900.0
8,February,2022-02-09,250000.0
9,February,2022-02-27,250000.0
10,February,2022-02-24,150000.0


Elapsed time: 8.56 sec
