In [110]:
import apache_beam as beam
from apache_beam import CoGroupByKey, FlatMap
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [111]:
pipeline = beam.Pipeline(InteractiveRunner())

In [112]:
p_input = pipeline | "P" >> beam.Create([
    {'k1':'a','k2':'aa','k3':'aaa','v':1},
    {'k1':'b','k2':'bb','k3':'bbb','v':2},
    {'k1':'c','k2':'cc','k3':'ccc','v':3}
]) | beam.Map(lambda d: (d['k1'], (d['k2'], d['k3'], d['v'])))
ib.collect(p_input)

Unnamed: 0,0,1
0,a,"(aa, aaa, 1)"
1,b,"(bb, bbb, 2)"
2,c,"(cc, ccc, 3)"


In [113]:
# cardinality of S is much higher than P
s_input = pipeline | "S" >> beam.Create([
    {'K1':'b','K2':'bb','K3':'bbb','V':10}, # match k1,k2,k3 to K1, K2, K3 <--
    {'K1':'b','K2':'bb','K3':'-','V':20},  # match only k1,k2 to K1, K2
    {'K1':'c','K2':'cc','K3':'-','V':30},  # match only k1,k2, older v
    {'K1':'c','K2':'cc','K3':'-','V':40},  # match only K1,k2, newer v  <--
    {'K1':'d','K2':'dd','K3':'ddd','V':50},  # not match

]) | beam.Map(lambda d: (d['K1'], (d['K2'], d['K3'], d['V'])))
ib.collect(s_input)

Unnamed: 0,0,1
0,b,"(bb, bbb, 10)"
1,b,"(bb, -, 20)"
2,c,"(cc, -, 30)"
3,c,"(cc, -, 40)"
4,d,"(dd, ddd, 50)"


In [114]:
cogbk = {"p": p_input, "s": s_input} | CoGroupByKey()
ib.show(cogbk)

In [115]:
def match(p, s_list):
    matched = []
    for s in s_list:
        if p[0] == s[0] and p[1] == s[1]:
            matched.append(s)

    if len(matched) > 0:
        return matched, "exact"

    for s in s_list:
        if p[0] == s[0]:
            matched.append(s)

    if len(matched) > 0:
        return max(matched, key=lambda item: item[1]), 'partial'


In [116]:
def inner_join(element):
    key = element[0]
    _dict = element[1]
    p_list = _dict['p']
    s_list = _dict['s']
    return [{"key": key, "p": p, "s": match(p, s_list)} for p in p_list]


flattened = cogbk | FlatMap(inner_join)
ib.show(flattened)