# treeAggregate
Create a treeAggregate function that will computer maximum and minimum on an RDD in one pass.

In [2]:
## This command will override default Jupyter cell output style to prevent 'word-wrap' behavior for spark dataframes. 

# from IPython.core.display import HTML

# HTML("""<style>
#     .output-plaintext, .output-stream, .output{
#         white-space: pre !important;
#         font-family: Monaco; # Any monospaced font should work
#     }</style>""")

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext("local")
spark = SparkSession.builder.getOrCreate()

In [5]:
#spark.stop()

## Load the data

In [6]:
path="data\\"
testFile= path + "taxi-data-sorted-verysmall.csv"
df = spark.read.format('csv').options(header='false', inferSchema='true',  sep =",").load(testFile)

In [7]:
df.count()

10000

In [8]:
df.show(5, truncate=True)

+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|                 _c0|                 _c1|                _c2|                _c3|_c4| _c5|       _c6|      _c7|       _c8|      _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|
+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|120|0.44|-73.956528|40.716976| -73.96244|40.715008| CSH| 3.5| 0.5| 0.5| 0.0| 0.0| 4.5|
|22D70BF00EEB0ADC8...|3FF2709163DE7036F...|2013-01-01 00:02:00|2013-01-01 00:02:00|  0| 0.0|       0.0|      0.0|       0.0|      0.0| CSH|27.0| 0.0| 0.5| 0.0| 0.0|27.5|
|0EC22AAF491A8BD91...|778C92B26AE78A9EB...|2013-01-01 00:01:00|2013-01-01 00:03:00|120|0.71|-73.973145|40.752827|-73.965897|40.760445| CSH| 4.0| 0.5| 

In [9]:
df.toPandas().head(5)

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16
0,07290D3599E7A0D62097A346EFCC1FB5,E7750A37CAB07D0DFF0AF7E3573AC141,2013-01-01 00:00:00,2013-01-01 00:02:00,120,0.44,-73.956528,40.716976,-73.96244,40.715008,CSH,3.5,0.5,0.5,0.0,0.0,4.5
1,22D70BF00EEB0ADC83BA8177BB861991,3FF2709163DE7036FCAA4E5A3324E4BF,2013-01-01 00:02:00,2013-01-01 00:02:00,0,0.0,0.0,0.0,0.0,0.0,CSH,27.0,0.0,0.5,0.0,0.0,27.5
2,0EC22AAF491A8BD91F279350C2B010FD,778C92B26AE78A9EBDF96B49C67E4007,2013-01-01 00:01:00,2013-01-01 00:03:00,120,0.71,-73.973145,40.752827,-73.965897,40.760445,CSH,4.0,0.5,0.5,0.0,0.0,5.0
3,1390FB380189DF6BBFDA4DC847CAD14F,BE317B986700F63C43438482792C8654,2013-01-01 00:01:00,2013-01-01 00:03:00,120,0.48,-74.004173,40.720947,-74.003838,40.726189,CSH,4.0,0.5,0.5,0.0,0.0,5.0
4,3B4129883A1D05BE89F2C929DE136281,7077F9FD5AD649AEACA4746B2537E3FA,2013-01-01 00:01:00,2013-01-01 00:03:00,120,0.61,-73.987373,40.724861,-73.983772,40.730995,CRD,4.0,0.5,0.5,0.0,0.0,5.0


In [10]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)



## Rename columns

In [11]:
colum_names = ["medallion",
    "hack_license",
    'pickup_datetime',
    'dropoff_datetime',
    'trip_time',
    'trip_distance',
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude',
    'payment_type',
    'fare_amount',
    'surcharge',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'total_amount',
] 
df = df.toDF(*colum_names)
df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- trip_time: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



## Drop Columns

In [12]:
drop_colum_names = [
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude',
]
df = df.drop(*drop_colum_names) 
df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- trip_time: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



# Add new columns

In [13]:
from pyspark.sql.functions import col, substring
# Note: Please note that the position is not zero based, but 1 based index.
# Create a new columns wiht the year, month, day, hour and minute of the taxi trip
df = df.withColumn('year', substring('pickup_datetime', 1,4))\
    .withColumn('month', substring('pickup_datetime', 6,2))\
    .withColumn('day', substring('pickup_datetime', 9,2))\
    .withColumn('hour', substring('pickup_datetime', 12,2))\
    .withColumn('minute', substring('pickup_datetime', 15,2))
df.show(5,truncate=False, vertical=False)

+--------------------------------+--------------------------------+-------------------+-------------------+---------+-------------+------------+-----------+---------+-------+----------+------------+------------+----+-----+---+----+------+
|medallion                       |hack_license                    |pickup_datetime    |dropoff_datetime   |trip_time|trip_distance|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|year|month|day|hour|minute|
+--------------------------------+--------------------------------+-------------------+-------------------+---------+-------------+------------+-----------+---------+-------+----------+------------+------------+----+-----+---+----+------+
|07290D3599E7A0D62097A346EFCC1FB5|E7750A37CAB07D0DFF0AF7E3573AC141|2013-01-01 00:00:00|2013-01-01 00:02:00|120      |0.44         |CSH         |3.5        |0.5      |0.5    |0.0       |0.0         |4.5         |2013|01   |01 |00  |00    |
|22D70BF00EEB0ADC83BA8177BB861991|3FF2709163

In [14]:
df.toPandas()

Unnamed: 0,medallion,hack_license,pickup_datetime,dropoff_datetime,trip_time,trip_distance,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount,year,month,day,hour,minute
0,07290D3599E7A0D62097A346EFCC1FB5,E7750A37CAB07D0DFF0AF7E3573AC141,2013-01-01 00:00:00,2013-01-01 00:02:00,120,0.44,CSH,3.5,0.5,0.5,0.0,0.0,4.5,2013,01,01,00,00
1,22D70BF00EEB0ADC83BA8177BB861991,3FF2709163DE7036FCAA4E5A3324E4BF,2013-01-01 00:02:00,2013-01-01 00:02:00,0,0.00,CSH,27.0,0.0,0.5,0.0,0.0,27.5,2013,01,01,00,02
2,0EC22AAF491A8BD91F279350C2B010FD,778C92B26AE78A9EBDF96B49C67E4007,2013-01-01 00:01:00,2013-01-01 00:03:00,120,0.71,CSH,4.0,0.5,0.5,0.0,0.0,5.0,2013,01,01,00,01
3,1390FB380189DF6BBFDA4DC847CAD14F,BE317B986700F63C43438482792C8654,2013-01-01 00:01:00,2013-01-01 00:03:00,120,0.48,CSH,4.0,0.5,0.5,0.0,0.0,5.0,2013,01,01,00,01
4,3B4129883A1D05BE89F2C929DE136281,7077F9FD5AD649AEACA4746B2537E3FA,2013-01-01 00:01:00,2013-01-01 00:03:00,120,0.61,CRD,4.0,0.5,0.5,0.0,0.0,5.0,2013,01,01,00,01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,F476E33AE955F276113772AB44540488,B7C3EDB805D5074CFFBA649A98CBCC3D,2013-01-01 00:44:00,2013-01-01 00:56:00,720,2.22,CSH,10.0,0.5,0.5,0.0,0.0,11.0,2013,01,01,00,44
9996,F6AC5ED8F624EC43788EA84D9979379C,241AB3536C3494045D967E758844F5B6,2013-01-01 00:49:00,2013-01-01 00:56:00,420,0.75,CRD,6.0,0.5,0.5,1.3,0.0,8.3,2013,01,01,00,49
9997,F865789AEA73DF9A5CDECE38BC5BB3B5,2223D379E6348801232994F06477548E,2013-01-01 00:41:00,2013-01-01 00:56:00,900,3.64,CRD,14.0,0.5,0.5,1.0,0.0,16.0,2013,01,01,00,41
9998,F920D6FCA90502CC780348B1AC364596,DB93A24BCE932FD908322C103C821A39,2013-01-01 00:42:00,2013-01-01 00:56:00,840,1.43,CSH,10.0,0.5,0.5,0.0,0.0,11.0,2013,01,01,00,42


In [15]:
# Create fare_category column as a integer of the fare_amount
df = df.withColumn('fare_category', df['fare_amount'].cast("int"))
df.show(5)

+--------------------+--------------------+-------------------+-------------------+---------+-------------+------------+-----------+---------+-------+----------+------------+------------+----+-----+---+----+------+-------------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time|trip_distance|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|year|month|day|hour|minute|fare_category|
+--------------------+--------------------+-------------------+-------------------+---------+-------------+------------+-----------+---------+-------+----------+------------+------------+----+-----+---+----+------+-------------+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|      120|         0.44|         CSH|        3.5|      0.5|    0.5|       0.0|         0.0|         4.5|2013|   01| 01|  00|    00|            3|
|22D70BF00EEB0ADC8...|3FF2709163DE7036F...|2013-01-01 00:02:00|2013-01-01 00:02:00| 

In [16]:
df.groupBy('fare_category').count().show(10)

+-------------+-----+
|fare_category|count|
+-------------+-----+
|           31|   13|
|           53|    2|
|           34|   12|
|           28|   38|
|           27|   46|
|           26|   51|
|           44|    2|
|           12|  416|
|           22|  118|
|          259|    1|
+-------------+-----+
only showing top 10 rows



In [17]:
# Find the number of trips in given minute of the hour
df.groupBy('minute').count().show(5)

+------+-----+
|minute|count|
+------+-----+
|    07|  165|
|    51|   44|
|    15|  231|
|    54|   10|
|    11|  221|
+------+-----+
only showing top 5 rows



In [18]:
# convert to RDD
r = df.groupBy('minute').count().rdd.map(tuple)
r.take(5)

[('07', 165), ('51', 44), ('15', 231), ('54', 10), ('11', 221)]

In [19]:
# l = [1,2,3,4,5]

# s=1
# for i in l:
#     s = s*i
# s

# [s1,s2, s3, ...]

# final_sum = finanl_sum +si

# Custom treeAggregate functions

In [20]:
# example of function input (x = ("k1",-10000,"k2",1000000) , y = ('07', 165))
def seqOp(x,y):
    # res = (max_key, max_val, min_key, min_val)
    res = [0,0,0,0]
    if x[1] > y[1]:
        # it the current x values are bigger than the y then keep them
        res[0] = x[0]
        res[1] = x[1]
    else:
        # y is bigger, so use y as a maximum
        res[0] = y[0]
        res[1] = y[1]
    if x[3] < y[1]:
        # if the current x values are smaller than the y then keep them
        res[2] = x[2]
        res[3] = x[3]
    else:
        # y is smaller, so use y as a minimum
        res[2] = y[0]
        res[3] = y[1]
    return res
  
# example of function input (x = ('14',217, '49',70 ) , y = ('13',220, '50',70 ))
def combOp(x,y):
    # res = (max_key, max_val, min_key, min_val)
    res = [0,0,0,0]
    if x[1] > y[1]:
        # if the data in the x is bigger, then use x as the result
        res[0] = x[0]
        res[1] = x[1]
    else:
        res[0] = y[0]
        res[1] = y[1]
    if x[3] < y[3]:
        # if the data in the x is smaller, then use x as the result
        res[2] = x[2]
        res[3] = x[3]
    else:
        res[2] = y[2]
        res[3] = y[3]
    return res

### Testing the aggregate funntions

In [21]:
print(seqOp(["k1",-1000000,"k2",1000000],("k3",5)))
print(seqOp(["k1",20,"k2",10],("k3",5)))
print(seqOp(["k1",20,"k2",10],("k3",25)))

['k3', 5, 'k3', 5]
['k1', 20, 'k3', 5]
['k3', 25, 'k2', 10]


In [22]:
print(combOp(["k1",20,"k2",10],["k3",25,"k4",15]))
print(combOp(["k1",20,"k2",10],["k3",25,"k4",5]))

['k3', 25, 'k2', 10]
['k3', 25, 'k4', 5]


### Create dataset for aggregation

In [23]:
# convert to RDD. Rides 
r = df.groupBy('minute').count().rdd.map(tuple)
r.take(10)

[('07', 165),
 ('51', 44),
 ('15', 231),
 ('54', 10),
 ('11', 221),
 ('29', 267),
 ('42', 149),
 ('30', 247),
 ('34', 237),
 ('01', 45)]

In [24]:
# treeAggregate(zeroValue, seqOp, combOp)
# Result = (max_key, max_val, min_key, min_val)
agg_zero_val = ["k1",-1000000,"k2",1000000]

r.treeAggregate(agg_zero_val, seqOp, combOp)

['28', 293, '56', 1]

In [25]:
r1 = df.groupBy('fare_category').count().rdd.map(tuple)
r1.treeAggregate(agg_zero_val, seqOp, combOp)

[6, 1066, 66, 1]

In [26]:
# an md5sum of the identifier of the taxi - vehicle bound (Taxi ID)
r2 = df.groupBy('medallion').count().rdd.map(tuple)
r2.treeAggregate(agg_zero_val, seqOp, combOp)


['696321779D687411F2E5DF6991E9D474', 7, '6D9C2E4EAC8F6A5C7D3102177BC42C03', 1]

In [27]:
# an md5sum of the identifier for the taxi license (Driver ID)
r2 = df.groupBy('hack_license').count().rdd.map(tuple)
r2.treeAggregate(agg_zero_val, seqOp, combOp)


['00B7691D86D96AEBD21DD9E138F90840', 10, 'D8B109DC861AA892745CC4CFF78D98E3', 1]

# Simulation of the Aggregate operation in Spark
We will take the data from RDD and use it as the input in the Spark Aggregate simulation.

The simulation will be done using the following steps:

- We will split the data into partitions, 
- For each partition, we will simulate what is happening on the worker nodes.
- At the master node, we will use all partition sub-aggregates to combine them in the final aggregate, which is the result of the aggregate function.

In [54]:
r = df.groupBy('hack_license').count().rdd.map(tuple)
r.take(10)

[('130328475AD7427AFDE50A846CA08B22', 1),
 ('D4F2AE0988ECB2E421AAC0C876483801', 3),
 ('DD97899ACAC51EF3188A659DB1F4EDBB', 5),
 ('A7C47E60941315A0E1B18190584F1B8F', 4),
 ('9911D66A4A796752DAA9929262692322', 3),
 ('88CB7A1006DB184386777ACF070430A9', 4),
 ('069B5562096AF76848A613F23073B4BA', 2),
 ('28A7C858D9231A3EC2C90820A26083DC', 2),
 ('A7EE9AEDB7325F55F14F2D2448170D56', 2),
 ('4B6EFCBC110DB539E9ECCD320DB55ADC', 3)]

In [55]:
data = r.collect()
print(len(data))

4628


### Partition the data
We will create partitions based on a predefined partition_size parameter.

In [56]:
# Partition the data
partition_size = 1000
# Create partitions with size partition_size
partitions = [data[i:i + partition_size] for i in range(0, len(data), partition_size)]
# Print the number of created partitions
print(len(partitions))

5


### Worker nodes simulation
We are using a for loop to simulate each worker node.

The partition_rez list is used to store the result from each partition, and then this list is used by the master node to combine the partial aggregates.

In [57]:
# The list for storing the partitions aggregates.
partitions_rez = []
for partition in partitions:
    # This code will be executed on each worker node
    part_rez = agg_zero_val # initialise the aggregate
    for element in partition:
        part_rez = seqOp(part_rez, element) # update the aggregate with each element
    # Store the final partition aggregate in the list (for our simulation)
    # In Spark, at this point, the part_rez will be sent to the master node
    partitions_rez.append(part_rez)

In [58]:
# Print the results from each partition
for i, rez in enumerate(partitions_rez):
    print(i, rez)

0 ['529E7364F15A734DFB3F44376DC78267', 6, '3E8783B29ABF15A4F5B475D928D2A7BD', 1]
1 ['14C2ED390669165F2D3B5CE427A91027', 7, '2FBF9DAD51F548A6F1CE63165CC30BA7', 1]
2 ['DCD6A3DA3488EF99AAC46FE0EF41449B', 5, 'A8B92DC57A3DF887FF41DA5EBA763A97', 1]
3 ['00B7691D86D96AEBD21DD9E138F90840', 10, '956BC98C042F35EB171FF7FB1A6721B9', 1]
4 ['35A32952035266841AED93F2EFCEEB9D', 6, 'D8B109DC861AA892745CC4CFF78D98E3', 1]


### Master node simulation
The code that is executed on the master node. 

This code creates the final aggregate from all partitions aggregates.

In [59]:
# We start with the initial (zero) value
final_rez = agg_zero_val
# In the simulation, we use a loop to process all partitions' aggregates.
for part_rez in partitions_rez:
    # In Spark, the master node will receive aggregates from the worker nodes
    # and will execute the combOp() function
    final_rez = combOp(final_rez, part_rez)
print(final_rez)

['00B7691D86D96AEBD21DD9E138F90840', 10, 'D8B109DC861AA892745CC4CFF78D98E3', 1]
