In [1]:
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
import typing
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

In [2]:
HEADER_ROW='legal_entity,counter_party,tier,max(rating by counterparty),sum(value where status = ARAP),sum(value where status = ACCR)'

In [3]:
class Dataset(typing.NamedTuple):
    invoice_id: int
    legal_entity: str
    counter_party: str
    rating: int
    status: str
    value: float
    tier: int

class OutDataset(typing.NamedTuple):
    legal_entity: str
    counter_party: str
    tier: int
    max_rating_by_cp: int
    sum_value_arap: float
    sum_value_accr: float

In [4]:
class SplitCsvContent(beam.DoFn):
    def process(self, element):
        yield tuple(element.split(','))

In [5]:
class CreateKVPairOfContentByIndexList(beam.DoFn):
    def process(self, element, index_list):
        yield (([element[x] for x in index_list], element))

In [6]:
deconstruct_left_join = lambda x: [y + (x[1]['right'][0][-1] if x[1]['right'] != [] else 0.0,) for y in x[1]['left']]
deconstruct_right_join = lambda x: [y[:-1] + (x[1]['left'][0][-1] if x[1]['left'] != [] else 0.0,) + (y[-1],) for y in x[1]['right']]
deconstruct_outer_join = lambda x: deconstruct_left_join(x) + deconstruct_right_join(x)

In [7]:
class DeconstructJoin(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll | beam.FlatMap(deconstruct_outer_join) \
                | beam.Distinct())

In [8]:
class GroupByCalculateSumValueAndCreateIndexByKV(beam.PTransform):
    def __init__(self, column_list):
        self.column_list = column_list
    def expand(self, pcoll):
        return (pcoll | beam.GroupBy(self.column_list).aggregate_field('value', sum, 'value_sum') \
                    | beam.ParDo(CreateKVPairOfContentByIndexList(), range(len(self.column_list))))

In [9]:
p = beam.Pipeline(InteractiveRunner())

In [10]:
#Resolve differences with \r and \n
text1: str
text2: str

with open ('dataset1.csv', 'r') as d1:
    text1 = d1.read()
    text1.replace('\r', '\n')
with open('dataset1.csv', 'w') as d1:
    d1.write(text1)

with open ('dataset2.csv', 'r') as d2:
    text2 = d2.read()
    text2.replace('\r', '\n')
with open('dataset2.csv', 'w') as d2:
    d2.write(text2)

In [11]:
#Bring in data
dataset1 = p | "Read1" >> beam.io.ReadFromText('dataset1.csv', skip_header_lines=1)
dataset2 = p | "Read2" >> beam.io.ReadFromText('dataset2.csv', skip_header_lines=1)
ib.collect(dataset1)

Unnamed: 0,0
0,"1,L1,C1,1,ARAP,10"
1,"2,L2,C2,2,ARAP,20"
2,"3,L3,C3,4,ACCR,30"
3,"4,L1,C4,6,ARAP,40"
4,"5,L2,C5,4,ACCR,50"
5,"6,L3,C6,6,ACCR,60"
6,"7,L1,C1,2,ARAP,10"
7,"8,L2,C2,3,ACCR,40"
8,"9,L3,C3,3,ACCR,80"
9,"10,L1,C4,5,ACCR,100"


In [12]:
#Split csv content out
data_split = (dataset1 | "SplitCsv1" >> beam.ParDo(SplitCsvContent()) \
            | "ConvertToKVPair1" >> beam.ParDo(CreateKVPairOfContentByIndexList(), [2]))
data_tier_split = (dataset2 | "SplitCsv2" >> beam.ParDo(SplitCsvContent()) \
            | "ConvertToKVPair2" >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0]))

In [13]:
#Get Tier
combined_data_struct = ({'left': data_split, 'right': data_tier_split} | beam.CoGroupByKey() \
                        | beam.FlatMap(deconstruct_left_join))
ib.collect(combined_data_struct)

Unnamed: 0,0,1,2,3,4,5,6
0,1,L1,C1,1,ARAP,10,1
1,7,L1,C1,2,ARAP,10,1
2,13,L1,C1,3,ARAP,20,1
3,2,L2,C2,2,ARAP,20,2
4,8,L2,C2,3,ACCR,40,2
5,3,L3,C3,4,ACCR,30,3
6,9,L3,C3,3,ACCR,80,3
7,14,L2,C3,2,ACCR,52,3
8,15,L3,C3,4,ACCR,35,3
9,16,L1,C3,6,ARAP,5,3


In [14]:
#Convert to schema
data_schema = combined_data_struct | beam.Map(lambda x: Dataset(
    invoice_id=int(x[0]),
    legal_entity=x[1],
    counter_party=x[2],
    rating=int(x[3]),
    status=x[4],
    value=float(x[5]),
    tier=int(x[6]))).with_output_types(Dataset)
ib.collect(data_schema)

Unnamed: 0,invoice_id,legal_entity,counter_party,rating,status,value,tier
0,1,L1,C1,1,ARAP,10.0,1
1,7,L1,C1,2,ARAP,10.0,1
2,13,L1,C1,3,ARAP,20.0,1
3,2,L2,C2,2,ARAP,20.0,2
4,8,L2,C2,3,ACCR,40.0,2
5,3,L3,C3,4,ACCR,30.0,3
6,9,L3,C3,3,ACCR,80.0,3
7,14,L2,C3,2,ACCR,52.0,3
8,15,L3,C3,4,ACCR,35.0,3
9,16,L1,C3,6,ARAP,5.0,3


In [15]:
#Get max rating by cp
max_rating_by_cp = data_schema | "MaxRatingByCP" >> beam.GroupBy('counter_party').aggregate_field('rating', max, 'max_rating')
ib.collect(max_rating_by_cp)

Unnamed: 0,counter_party,max_rating
0,C1,3
1,C2,3
2,C3,6
3,C4,6
4,C5,6
5,C6,6


In [16]:
#Split into ARAP and ACCR
arap_dataset = data_schema | 'SplitArap' >> beam.Filter(lambda x: x[4] == 'ARAP')
accr_dataset = data_schema | 'SplitAccr' >> beam.Filter(lambda x: x[4] == 'ACCR')
ib.collect(accr_dataset)

Unnamed: 0,invoice_id,legal_entity,counter_party,rating,status,value,tier
0,8,L2,C2,3,ACCR,40.0,2
1,3,L3,C3,4,ACCR,30.0,3
2,9,L3,C3,3,ACCR,80.0,3
3,14,L2,C3,2,ACCR,52.0,3
4,15,L3,C3,4,ACCR,35.0,3
5,10,L1,C4,5,ACCR,100.0,4
6,5,L2,C5,4,ACCR,50.0,5
7,17,L2,C5,3,ACCR,65.0,5
8,6,L3,C6,6,ACCR,60.0,6


In [17]:
#Retrieve LE Totals
le_group_by_arap = (arap_dataset | "GroupByLeArap" >> beam.GroupBy('legal_entity').aggregate_field('value', sum, 'value_sum_arap') \
                    | 'CreateKVPairLeArap' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0]))
le_group_by_accr = (accr_dataset | "GroupByLeAccr" >> beam.GroupBy('legal_entity').aggregate_field('value', sum, 'value_sum_accr') \
                    | 'CreateKVPairLeAccr' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0]))
le_group_by = {'left': le_group_by_arap, 'right': le_group_by_accr} | "MergeLe" >> beam.CoGroupByKey()
le_group_by = le_group_by | DeconstructJoin() \
        |beam.Map(lambda x: OutDataset(
    legal_entity=x[0],
    counter_party='Total',
    tier='Total',
    max_rating_by_cp='N/A',
    sum_value_arap=x[1],
    sum_value_accr=x[2]
)).with_output_types(OutDataset)
ib.collect(le_group_by)

Unnamed: 0,legal_entity,counter_party,tier,max_rating_by_cp,sum_value_arap,sum_value_accr
0,L2,Total,Total,,1020.0,207.0
1,L3,Total,Total,,145.0,205.0
2,L1,Total,Total,,85.0,100.0


In [18]:
#Retrieve LE, CP Totals 
le_cp_group_by_arap = (arap_dataset | "GroupByLeCpArap" >> beam.GroupBy("legal_entity", "counter_party").aggregate_field('value', sum, 'value_sum_arap') \
                       | 'CreateKVPairLeCpArap' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0, 1]))
le_cp_group_by_accr = (accr_dataset | "GroupByLeCpAccr" >> beam.GroupBy("legal_entity", "counter_party").aggregate_field('value', sum, 'value_sum_accr') \
                       | 'CreateKVPairLeCpAccr' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0, 1]))
le_cp_group_by = {'left': le_cp_group_by_arap, 'right': le_cp_group_by_accr} | "MergeLeCp" >> beam.CoGroupByKey()
le_cp_group_by = (le_cp_group_by | DeconstructJoin() \
                  | beam.Map(lambda x: OutDataset(
    legal_entity=x[0],
    counter_party=x[1],
    tier='Total',
    max_rating_by_cp='N/A',
    sum_value_arap=x[2],
    sum_value_accr=x[3]
)).with_output_types(OutDataset))
ib.collect(le_cp_group_by)

Unnamed: 0,legal_entity,counter_party,tier,max_rating_by_cp,sum_value_arap,sum_value_accr
0,L2,C2,Total,,20.0,40.0
1,L3,C3,Total,,0.0,145.0
2,L2,C3,Total,,0.0,52.0
3,L1,C4,Total,,40.0,100.0
4,L2,C5,Total,,1000.0,115.0
5,L3,C6,Total,,145.0,60.0
6,L1,C1,Total,,40.0,0.0
7,L1,C3,Total,,5.0,0.0


In [19]:
#Retrieve CP Rows
cp_group_by_arap = (arap_dataset | "GroupByCpArap" >> beam.GroupBy('counter_party').aggregate_field('value', sum, 'value_sum_arap') \
                    | 'CreateKVPairCpArap' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0]))
cp_group_by_accr = (accr_dataset | "GroupByCpAccr" >> beam.GroupBy('counter_party').aggregate_field('value', sum, 'value_sum_accr') \
                    | 'CreateKVPairCpAccr' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0]))
cp_group_by = {'left': cp_group_by_arap, 'right': cp_group_by_accr} | "MergeCp" >> beam.CoGroupByKey()
cp_group_by = (cp_group_by | DeconstructJoin() \
               | beam.Map(lambda x: OutDataset(
    legal_entity='Total',
    counter_party=x[0],
    tier='Total',
    max_rating_by_cp='N/A',
    sum_value_arap=x[1],
    sum_value_accr=x[2]
)).with_output_types(OutDataset))
ib.collect(cp_group_by)

Unnamed: 0,legal_entity,counter_party,tier,max_rating_by_cp,sum_value_arap,sum_value_accr
0,Total,C1,Total,,40.0,0.0
1,Total,C2,Total,,20.0,40.0
2,Total,C3,Total,,5.0,197.0
3,Total,C4,Total,,40.0,100.0
4,Total,C5,Total,,1000.0,115.0
5,Total,C6,Total,,145.0,60.0


In [20]:
#Retrieve TI Rows
ti_group_by_arap = (arap_dataset | "GroupByTiArap" >> beam.GroupBy('tier').aggregate_field('value', sum, 'value_sum_arap') \
                    | 'CreateKVPairTiArap' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0]))
ti_group_by_accr = (accr_dataset | "GroupByTiAccr" >> beam.GroupBy('tier').aggregate_field('value', sum, 'value_sum_accr') \
                    | 'CreateKVPairTiAccr' >> beam.ParDo(CreateKVPairOfContentByIndexList(), [0]))
ti_group_by = {'left': ti_group_by_arap, 'right': ti_group_by_accr} | "MergeTi" >> beam.CoGroupByKey()
ti_group_by = (ti_group_by | DeconstructJoin() \
               | beam.Map(lambda x: OutDataset(
    legal_entity='Total',
    counter_party='Total',
    tier=x[0],
    max_rating_by_cp='N/A',
    sum_value_arap=x[1],
    sum_value_accr=x[2]
)).with_output_types(OutDataset))
ib.collect(ti_group_by)

Unnamed: 0,legal_entity,counter_party,tier,max_rating_by_cp,sum_value_arap,sum_value_accr
0,Total,Total,2,,20.0,40.0
1,Total,Total,3,,5.0,197.0
2,Total,Total,4,,40.0,100.0
3,Total,Total,5,,1000.0,115.0
4,Total,Total,6,,145.0,60.0
5,Total,Total,1,,40.0,0.0


In [21]:
#Consolidate data
consolidated_data = (le_group_by, le_cp_group_by, cp_group_by, ti_group_by) | beam.Flatten()
ib.collect(consolidated_data)

Unnamed: 0,legal_entity,counter_party,tier,max_rating_by_cp,sum_value_arap,sum_value_accr
0,L2,Total,Total,,1020.0,207.0
1,L3,Total,Total,,145.0,205.0
2,L1,Total,Total,,85.0,100.0
3,Total,Total,2,,20.0,40.0
4,Total,Total,3,,5.0,197.0
5,Total,Total,4,,40.0,100.0
6,Total,Total,5,,1000.0,115.0
7,Total,Total,6,,145.0,60.0
8,Total,Total,1,,40.0,0.0
9,L2,C2,Total,,20.0,40.0


In [22]:
#Group by CP to get max rating by CP
consolidated_data_group_by_cp = consolidated_data | beam.GroupBy('counter_party')
out_data = ({'left': consolidated_data_group_by_cp, 'right': max_rating_by_cp} | "GetMaxRating" >> beam.CoGroupByKey() \
            | beam.FlatMap(lambda x: [tuple(y) + (x[1]['right'][0] if x[1]['right'] != [] else 'N/A',) for y in x[1]['left'][0]]) \
            | beam.Map(lambda x: '%s,%s,%s,%s,%s,%s' % (x[0],x[1],x[2],x[6],x[4],x[5])))
ib.collect(out_data)

Unnamed: 0,0
0,"L1,C1,Total,3,40.0,0.0"
1,"Total,C1,Total,3,40.0,0.0"
2,"L2,C2,Total,3,20.0,40.0"
3,"Total,C2,Total,3,20.0,40.0"
4,"L3,C3,Total,6,0.0,145.0"
5,"L2,C3,Total,6,0.0,52.0"
6,"L1,C3,Total,6,5.0,0.0"
7,"Total,C3,Total,6,5.0,197.0"
8,"L1,C4,Total,6,40.0,100.0"
9,"Total,C4,Total,6,40.0,100.0"


In [23]:
#Output to file
out_data | beam.io.WriteToText('out_beam', '.csv', header=HEADER_ROW)
p.run().wait_until_finish()