Implementation of minimal map reduce SLIDING AGGREGATION https://dl.acm.org/doi/10.1145/2463676.2463719

https://www.cse.cuhk.edu.hk/~taoyf/paper/sigmod13-mr.pdf

Authors of algortithm: Yufei Tao, Wenqing Lin, Xiaokui Xiao

Yellow Taxi Trip Records (CSV) data from https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page for January 2021. For each record I've computed the average ride distance and the average passenger occupancy during the last 1000 rides. The algorithm is minimal and follows the one from the paper. It Uses Spark RDD API Python.

In [None]:

# config

FILE = "yellow_tripdata_2021-01.csv"
COL_NAMES = ["tpep_dropoff_datetime", "passenger_count", "trip_distance"]
AGGREGATE_FIELD = "passenger_count"
# AGGREGATE_FIELD = "trip_distance"
AGGREGATE_COLUMN_NAME = "aggregate_result"
SORT_FIELD = "tpep_dropoff_datetime"
t = 8  # num of machines
l = 1000  # length of window

In [3]:

# globals

import random
from math import ceil
from math import log
from operator import add

from pyspark.sql import SparkSession

DUMMY = 'DUMMY'
SUM_FROM_OTHER_PREFIX = 'WHOLE_SUM_FROM_'
OBJECTS_FROM_OTHER_PREFIX = 'OBJECTS_FROM_'
RANK_SUM_PREFIX = 'BIG_PREFIX_'
RANK_KEY = 'rank'
spark = SparkSession.builder.getOrCreate()
m = 0
n = 0



In [4]:

# utlis

def get_key_value(x):
    return x[SORT_FIELD]


def get_agg_value(x):
    try:
        return float(x[AGGREGATE_FIELD])
    except:
        return 0


def is_dummy(x):
    return get_key_value(x) == DUMMY


def decision(row, probability):
    if is_dummy(row):
        return False
    return random.random() < probability


def compute_rho(m, n, t):
    return 1 / m * log(n * t)


def key(x):
    return x[0]


def value(x):
    return x[1]


def rank(o):
    return o[RANK_KEY]


class RankRangeSum:
    def __init__(self, array, start_rank):
        self.prefix_sum = [0]
        self.start_rank = start_rank
        for x in array:
            self.prefix_sum.append(x + self.prefix_sum[-1])

    def get_sum_from_rank_range(self, i, j):
        if j + 1 - self.start_rank >= len(self.prefix_sum):
            return float(self.prefix_sum[-1] - self.prefix_sum[max(i - self.start_rank, 0)])
        return float(self.prefix_sum[j + 1 - self.start_rank] - self.prefix_sum[max(i - self.start_rank, 0)])

In [5]:

# data prepare + sampling for terasort

def add_dummies(df):
    global n
    if n % t != 0:
        dummies_count_to_add = t - n % t
        dummies_to_add = [[DUMMY for _ in COL_NAMES] for _ in range(dummies_count_to_add)]
        df = df.union(spark.createDataFrame(dummies_to_add))
        n = df.count()
    return df


def prepare_rdd():
    global m, n
    df = spark.read.options(sep=",", header=True).csv(FILE).select(COL_NAMES)
    n = df.count()
    df = add_dummies(df)
    rdd = df.rdd.repartition(t)
    m = n / t
    return rdd


def sample(rdd, rho):
    res = rdd.map(lambda x: x if decision(x, rho) else None).filter(lambda x: x is not None).collect()
    res = sorted(res, key=lambda x: get_key_value(x))
    return res


def get_boundaries_from_sample(sample):
    boundaries = []
    for i in range(1, t):
        index = i * ceil(len(sample) / t) - 1
        boundaries.append(sample[index])

    return boundaries

In [6]:

# tera sort

def tera_sort(rdd):
    rho = compute_rho(m, n, t)
    boundaries = get_boundaries_from_sample(sample(rdd, rho))
    # In the paper it's sent to machines directly like 'RANK_SUM_PREFIX' in rank
    boundaries = spark.sparkContext.broadcast(boundaries)

    def map_boundaries(x):
        for idx, boundary in enumerate(boundaries.value):
            if get_key_value(x) <= get_key_value(boundary):
                return idx, [x]
        return len(boundaries.value), [x]

    res = rdd.map(map_boundaries).reduceByKey(add).map(lambda x: (key(x), sorted(value(x), key=get_key_value)))
    boundaries.unpersist()
    return res

In [7]:

# ranking and perfect balance

def do_rank(sorted_rdd):
    def mapper(v):
        return [v] + [(receiver, [(RANK_SUM_PREFIX + str(key(v)), len(value(v)))]) for receiver in
                      range(key(v) + 1, t)]

    def compute_rank(x):
        big_prefixes_sum = sum(map(lambda prefix_tuple: value(prefix_tuple),
                                   filter(lambda el: key(el).startswith(RANK_SUM_PREFIX), value(x))))
        elements = list(filter(lambda el: not key(el).startswith(RANK_SUM_PREFIX), value(x)))
        for idx, el in enumerate(elements):
            elements[idx] = el.asDict()
            elements[idx][RANK_KEY] = idx + big_prefixes_sum + 1
        return x[0], elements

    res = sorted_rdd.flatMap(mapper).reduceByKey(add).map(compute_rank)
    return res


def perfect_balance(ranked_rdd):
    res = ranked_rdd.flatMap(lambda x: value(x)).map(lambda x: (ceil(rank(x) / m - 1), [x])).reduceByKey(add).map(
        lambda x: (int(key(x)), sorted(value(x), key=get_key_value)))
    return res

In [8]:

# sliding aggregation

def sliding_agregation(balanced_rdd):
    def send_objects_to_other_machines(v):
        whole_sums = [(i, [(SUM_FROM_OTHER_PREFIX + str(v[0]), sum(map(get_agg_value, v[1])))]) for i in range(v[0]+1, t)]
        objects_to_send = []
        if l <= m:
            if key(v) + 1 < t:
                objects_to_send = [(key(v) + 1, [(OBJECTS_FROM_OTHER_PREFIX + str(key(v)), value(v))])]
        else:
            receiver1 = int(key(v) + (l - 1) // m)
            receiver2 = receiver1 + 1
            if receiver1 < t:
                objects_to_send = objects_to_send + [(receiver1, [(OBJECTS_FROM_OTHER_PREFIX + str(key(v)), value(v))])]
            if receiver2 < t:
                objects_to_send = objects_to_send + [(receiver2, [(OBJECTS_FROM_OTHER_PREFIX + str(key(v)), value(v))])]
        return objects_to_send + whole_sums + [v]

    def compute_windows(x):
        (i, value) = x
        own_objects = list(filter(lambda el: type(el) is dict, value))
        without_own_objects = list(filter(lambda els: type(els) is not dict, value))
        objects_from_others = dict(map(lambda els: (int(els[0][len(OBJECTS_FROM_OTHER_PREFIX):]), els[1]),
                                       filter(lambda els: els[0].startswith(OBJECTS_FROM_OTHER_PREFIX),
                                              without_own_objects)))
        whole_sums = dict(map(lambda els: (int(els[0][len(SUM_FROM_OTHER_PREFIX):]), els[1]),
                              filter(lambda els: els[0].startswith(SUM_FROM_OTHER_PREFIX), without_own_objects)))
        prefix_sum_own_objects = RankRangeSum(map(get_agg_value, own_objects), rank(own_objects[0]))

        prefix_sums_of_others_objects = dict(
            map(lambda item: (item[0], RankRangeSum(map(get_agg_value, item[1]), rank(item[1][0]))),
                objects_from_others.items()))

        for o in own_objects:
            window_o = 0
            to_rank = rank(o)
            from_rank = to_rank - l + 1
            alpha = ceil(from_rank / m) - 1

            if alpha >= 0 and alpha != i:
                window_o += prefix_sums_of_others_objects[alpha].get_sum_from_rank_range(from_rank, to_rank)

            if alpha != i:
                window_o += sum(map(lambda x: x[1], filter(lambda x: alpha < x[0] < i, whole_sums.items())))

            window_o += prefix_sum_own_objects.get_sum_from_rank_range(from_rank, to_rank)

            o[AGGREGATE_COLUMN_NAME] = window_o / min(rank(o), l)

        own_objects = list(filter(lambda x: not is_dummy(x), own_objects))
        return own_objects

    res = balanced_rdd.flatMap(send_objects_to_other_machines).reduceByKey(add).flatMap(compute_windows)
    return res


In [9]:
%spark.pyspark

# main

def execute_sliding_window():
    rdd = prepare_rdd()
    sorted_rdd = tera_sort(rdd)
    ranked_rdd = do_rank(sorted_rdd)
    balanced_rdd = perfect_balance(ranked_rdd)
    aggregated_rdd = sliding_agregation(balanced_rdd)
    result = aggregated_rdd.collect()
    df = spark.createDataFrame(result)
    df.show(1200)


In [10]:
execute_sliding_window()

In [11]:
AGGREGATE_FIELD = "trip_distance"
execute_sliding_window()