In [41]:
class Table:
    def __init__(self, name, columns):
        self.name = name
        self.columns = columns
        self.rows = [] # array of dictionaries

    def add_row(self, row):
        if isinstance(row, dict):
            self.rows.append(row)
        elif isinstance(row, list):
            self.rows.append(dict(zip(self.columns, row)))
        else:
            raise TypeError("Invalid row type %s" % type(row))

    def __getitem__(self, key):
        if isinstance(key, str):
            return [row[key] for row in self.rows]
        elif isinstance(key, int):
            return self.rows[key]
        else:
            raise TypeError("Invalid key type %s" % type(key))

    def num_of_rows(self):
        return len(self.rows)

    def __str__(self):
        header = "Table(%s, %s)" % (self.name, self.columns)
        rows = "\n".join(["\t" + str(row) for row in self.rows])
        return header + "\n" + rows

In [42]:
t = Table('t1', ['a', 'b', 'c'])
t.add_row([1, 2, 3])
t.add_row([4, 5, 6])

print(t['a'] == [1, 4])
print(t[0] == {'a': 1, 'b': 2, 'c': 3})

t.rows

True
True


[{'a': 1, 'b': 2, 'c': 3}, {'a': 4, 'b': 5, 'c': 6}]

In [43]:
class PU:
    def __init__(self):
        self.hosted_tables = []

    def add_table(self, table):
        if table.name not in [t.name for t in self.hosted_tables]:
            self.hosted_tables.append(table)
        else:
            raise ValueError("Table %s already exists" % table.name)

    def __getitem__(self, key):
        if isinstance(key, str):
            for table in self.hosted_tables:
                if table.name == key:
                    return table
            raise KeyError("No table named %s" % key)
        else:
            raise TypeError("Invalid key type %s" % type(key))

class Prpd:
    def __init__(self, n_pus, hash):
        self.pus = [PU() for _ in range(n_pus)]
        self.tables = []
        self.hash = hash

    def create_table(self, name, columns):
        self.tables.append(Table(name, columns))

    def __getitem__(self, key):
        if isinstance(key, str):
            for table in self.tables:
                if table.name == key:
                    return table
            raise KeyError("No table named %s" % key)
        else:
            raise TypeError("Invalid key type %s" % type(key))

    def redistribute_table(self, table_name, key):
        table = self[table_name]
        # create table on every PU
        for pu in self.pus:
            pu.add_table(Table(table.name, table.columns))

        for row in table.rows:
            idx_pu = self.hash(row[key])
            self.pus[idx_pu][table_name].add_row(row)

    def join_redistribute(self, left_table_name, left_key, right_table_name, right_key):
        left_table = self[left_table_name]
        right_table = self[right_table_name]
        # create table on every PU
        for pu in self.pus:
            pu.add_table(Table(left_table.name + "_redis", left_table.columns))
            pu.add_table(Table(right_table.name + "_redis", right_table.columns))
            pu.add_table(Table("join_result", left_table.columns + right_table.columns))

        # redistribute tables
        for pu in self.pus:
            for row in pu[left_table_name].rows:
                idx_pu = self.hash(row[left_key])
                pu[left_table_name + "_redis"].add_row(row)

            for row in pu[right_table_name].rows:
                idx_pu = self.hash(row[right_key])
                pu[right_table_name + "_redis"].add_row(row)

        # join redistributions
        for pu in self.pus:
            left_table_redis = pu[left_table_name + "_redis"]
            right_table_redis = pu[right_table_name + "_redis"]
            for left_row in left_table_redis.rows:
                for right_row in right_table_redis.rows:
                    if left_row[left_key] == right_row[right_key]:
                        # merge rows
                        pu["join_result"].add_row({**left_row, **right_row})

        end_result = Table("join_result", left_table.columns + right_table.columns)
        for pu in self.pus:
            end_result.rows += pu["join_result"].rows
        return end_result

In [48]:
prpd = Prpd(3, lambda x: x % 3)
prpd.create_table('R', ['x', 'a'])
prpd['R'].add_row([3, 1])
prpd['R'].add_row([6, 2])
prpd['R'].add_row([9, 7])
prpd['R'].add_row([12, 1])
prpd['R'].add_row([1, 2])
prpd['R'].add_row([4, 1])
prpd['R'].add_row([7, 3])
prpd['R'].add_row([7, 1])
prpd['R'].add_row([2, 4])
prpd['R'].add_row([5, 1])
prpd['R'].add_row([5, 6])
prpd['R'].add_row([8, 1])
prpd.redistribute_table('R', 'x')

prpd.create_table('S', ['y', 'b'])
prpd['S'].add_row([2, 2])
prpd['S'].add_row([8, 2])
prpd['S'].add_row([8, 3])
prpd['S'].add_row([11, 6])
prpd['S'].add_row([1, 1])
prpd['S'].add_row([4, 2])
prpd['S'].add_row([4, 8])
prpd['S'].add_row([10, 2])
prpd['S'].add_row([0, 5])
prpd['S'].add_row([0, 1])
prpd['S'].add_row([3, 2])
prpd['S'].add_row([6, 2])
prpd.redistribute_table('S', 'y')

In [49]:
print("node 0:")
print(prpd.pus[0]['R'])
print(prpd.pus[0]['S'])
print("node 1:")
print(prpd.pus[1]['R'])
print(prpd.pus[1]['S'])
print("node 2:")
print(prpd.pus[2]['R'])
print(prpd.pus[2]['S'])

node 0:
Table(R, ['x', 'a'])
	{'x': 3, 'a': 1}
	{'x': 6, 'a': 2}
	{'x': 9, 'a': 7}
	{'x': 12, 'a': 1}
Table(S, ['y', 'b'])
	{'y': 0, 'b': 5}
	{'y': 0, 'b': 1}
	{'y': 3, 'b': 2}
	{'y': 6, 'b': 2}
node 1:
Table(R, ['x', 'a'])
	{'x': 1, 'a': 2}
	{'x': 4, 'a': 1}
	{'x': 7, 'a': 3}
	{'x': 7, 'a': 1}
Table(S, ['y', 'b'])
	{'y': 1, 'b': 1}
	{'y': 4, 'b': 2}
	{'y': 4, 'b': 8}
	{'y': 10, 'b': 2}
node 2:
Table(R, ['x', 'a'])
	{'x': 2, 'a': 4}
	{'x': 5, 'a': 1}
	{'x': 5, 'a': 6}
	{'x': 8, 'a': 1}
Table(S, ['y', 'b'])
	{'y': 2, 'b': 2}
	{'y': 8, 'b': 2}
	{'y': 8, 'b': 3}
	{'y': 11, 'b': 6}


In [50]:
print(prpd.join_redistribute('R', 'a', 'S', 'b'))

Table(join_result, ['x', 'a', 'y', 'b'])
	{'x': 3, 'a': 1, 'y': 0, 'b': 1}
	{'x': 6, 'a': 2, 'y': 3, 'b': 2}
	{'x': 6, 'a': 2, 'y': 6, 'b': 2}
	{'x': 12, 'a': 1, 'y': 0, 'b': 1}
	{'x': 1, 'a': 2, 'y': 4, 'b': 2}
	{'x': 1, 'a': 2, 'y': 10, 'b': 2}
	{'x': 4, 'a': 1, 'y': 1, 'b': 1}
	{'x': 7, 'a': 1, 'y': 1, 'b': 1}
	{'x': 5, 'a': 6, 'y': 11, 'b': 6}
