#treeAggregate
Create a treeAggregate function that will computer maximum and minimum on an RDD in one pass.

In [11]:
! pip install pyspark



In [14]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext("local")
spark = SparkSession.builder.getOrCreate()

In [13]:
sc.stop()

In [15]:
path=""
testFile= path + "taxi-data-sorted-verysmall.csv"
df = spark.read.format('csv').options(header='false', inferSchema='true',  sep =",").load(testFile)
df.show(5,truncate=True)

+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|                 _c0|                 _c1|                _c2|                _c3|_c4| _c5|       _c6|      _c7|       _c8|      _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|
+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|120|0.44|-73.956528|40.716976| -73.96244|40.715008| CSH| 3.5| 0.5| 0.5| 0.0| 0.0| 4.5|
|22D70BF00EEB0ADC8...|3FF2709163DE7036F...|2013-01-01 00:02:00|2013-01-01 00:02:00|  0| 0.0|       0.0|      0.0|       0.0|      0.0| CSH|27.0| 0.0| 0.5| 0.0| 0.0|27.5|
|0EC22AAF491A8BD91...|778C92B26AE78A9EB...|2013-01-01 00:01:00|2013-01-01 00:03:00|120|0.71|-73.973145|40.752827|-73.965897|40.760445| CSH| 4.0| 0.5| 

In [17]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)



In [30]:
from pyspark.sql.functions import col, substring
# Note: Please note that the position is not zero based, but 1 based index.
# Create a new columns wiht the year, month, day, hour and minute of the taxi trip
df = df.withColumn('year', substring('_c2', 1,4))\
    .withColumn('month', substring('_c2', 6,2))\
    .withColumn('day', substring('_c2', 9,2))\
    .withColumn('hour', substring('_c2', 12,2))\
    .withColumn('minute', substring('_c2', 15,2))
df.show(3)

+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+----+-----+---+----+---+------+
|                 _c0|                 _c1|                _c2|                _c3|_c4| _c5|       _c6|      _c7|       _c8|      _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|year|month|day|hour|min|minute|
+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+----+-----+---+----+---+------+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|120|0.44|-73.956528|40.716976| -73.96244|40.715008| CSH| 3.5| 0.5| 0.5| 0.0| 0.0| 4.5|2013|   01| 01|  00| 02|    00|
|22D70BF00EEB0ADC8...|3FF2709163DE7036F...|2013-01-01 00:02:00|2013-01-01 00:02:00|  0| 0.0|       0.0|      0.0|       0.0|      0.0| CSH|27.0| 0.0| 0.5| 0.0| 0.0|27.5|2013|   01| 01|  00| 02|   

In [32]:
# Find the number of trips in a 
df.groupBy('minute').count().show()

+------+-----+
|minute|count|
+------+-----+
|    07|  165|
|    51|   44|
|    15|  231|
|    54|   10|
|    11|  221|
|    29|  267|
|    42|  149|
|    30|  247|
|    34|  237|
|    01|   45|
|    22|  253|
|    28|  293|
|    16|  236|
|    35|  226|
|    52|   33|
|    47|  113|
|    43|  165|
|    31|  243|
|    18|  233|
|    27|  277|
+------+-----+
only showing top 20 rows



In [33]:
# convert to RDD
r = df.groupBy('minute').count().rdd.map(tuple).cache()
r.take(100)

[('07', 165),
 ('51', 44),
 ('15', 231),
 ('54', 10),
 ('11', 221),
 ('29', 267),
 ('42', 149),
 ('30', 247),
 ('34', 237),
 ('01', 45),
 ('22', 253),
 ('28', 293),
 ('16', 236),
 ('35', 226),
 ('52', 33),
 ('47', 113),
 ('43', 165),
 ('31', 243),
 ('18', 233),
 ('27', 277),
 ('00', 43),
 ('17', 274),
 ('26', 274),
 ('09', 178),
 ('46', 128),
 ('05', 129),
 ('19', 258),
 ('23', 280),
 ('41', 195),
 ('55', 2),
 ('08', 175),
 ('03', 77),
 ('38', 229),
 ('40', 196),
 ('25', 256),
 ('02', 56),
 ('44', 156),
 ('53', 20),
 ('33', 267),
 ('06', 147),
 ('48', 110),
 ('24', 239),
 ('32', 229),
 ('20', 260),
 ('56', 1),
 ('36', 223),
 ('10', 202),
 ('37', 190),
 ('49', 70),
 ('39', 209),
 ('12', 223),
 ('04', 89),
 ('13', 220),
 ('14', 217),
 ('21', 250),
 ('50', 70),
 ('45', 170)]

In [39]:
# example of function input (x = (0,-10000,1000000,1000000) , y = ('07', 165))
def seqOp(x,y):
    # res = (max_key, max_val, min_key, min_val)
    res = [0,0,0,0]
    if x[1] > y[1]:
        # it the current x values are bigger than the y then keep them
        res[0] = x[0]
        res[1] = x[1]
    else:
        # y is bigger, so use y as a maximum
        res[0] = y[0]
        res[1] = y[1]
    if x[3] < y[1]:
        # if the current x values are smaller than the y then keep them
        res[2] = x[2]
        res[3] = x[3]
    else:
        # y is smaller, so use y as a minimum
        res[2] = y[0]
        res[3] = y[1]
    return res
  
# example of function input (x = ('14',217, '49',70 ) , y = ('13',220, '50',70 ))
def combOp(x,y):
    # res = (max_key, max_val, min_key, min_val)
    res = [0,0,0,0]
    if x[1] > y[1]:
        # if the data in the x is bigger, then use x as the result
        res[0] = x[0]
        res[1] = x[1]
    else:
        res[0] = y[0]
        res[1] = y[1]
    if x[3] < y[3]:
        # if the data in the x is smaller, then use x as the result
        res[2] = x[2]
        res[3] = x[3]
    else:
        res[2] = y[2]
        res[3] = y[3]
    return res

In [40]:
# treeAggregate(zeroValue, seqOp, combOp)
# Result = (max_key, max_val, min_key, min_val)

r.treeAggregate([0,-10000,1000000,1000000], seqOp, combOp)

['28', 293, '56', 1]