In [6]:
!pip install --quiet apache-beam
!pip install --quiet apache_beam[dataframe]

import apache_beam as beam
from apache_beam.dataframe.io import read_csv

####### Questions & Assumptions #######
# Same as in Pandas file
# Only difference, assumed here that max rating by ['legal_entity', 'counterparty', 'tier']
# Although it is an overkill for such a small dataset, Assumed parallelism is mandatory and it was maintained  

# Create the pipeline object
with beam.Pipeline() as p:
    ### Using Beam DataFrame API
    # Reading the two csv files as Beam DataFrames (deferredDataFrames)
    ds1 = p | "Read dataset1" >> read_csv('../dataset1.csv')
    ds2 = p | "Read dataset2" >> read_csv('../dataset2.csv')
    
    # Join the two deferredDataFrames on 'counter_party', but first we need
    # to set it as index in the second dataframe so that we can declare
    # right_index=True. We could have converted ds1 and ds2 to Pandas dataframes where all data is in-memory
    # and do all the processing in Pandas, BUT we will lose the parallelism provided by Beam
    df_joined = ds1.merge(
        ds2.set_index('counter_party'), 
        right_index=True, 
        left_on='counter_party',
        how='left',
    ).drop(columns=['invoice_id']).rename(columns={'counter_party': 'counterparty'})
    
    # Get max rating and renaming the deferredDataSerie
    df_maxRatingByLegalEntityCounterpartyTier = df_joined.groupby(
        ['legal_entity', 'counterparty', 'tier']
    ).rating.max().rename('max(rating by counterparty)')
    
    # Get Sum of ARAP values and renaming the deferredDataSerie
    df_sum_value_where_status_ARAP = df_joined[df_joined['status']=='ARAP'].groupby(
        ['legal_entity', 'counterparty', 'tier']
    ).value.sum().rename('sum(value where status=ARAP)')
    
    # Get Sum of ACCR values and renaming the deferredDataSerie
    df_sum_value_where_status_ACCR = df_joined[df_joined['status']=='ACCR'].groupby(
        ['legal_entity', 'counterparty', 'tier']
    ).value.sum().rename('sum(value where status=ACCR)')
    
    # Get Total count of ['legal_entity', 'counterparty', 'tier'] and naming it as 'Count'
    df_count = df_joined.groupby(
        ['legal_entity', 'counterparty', 'tier']
    ).size().rename('Count')
    
    # Saving Total count of ['legal_entity', 'counterparty', 'tier'] as CSV, uncomment below
    #df_count.to_csv('total_record.csv')
    
    # Building the data structure of the requested output file by joining the first three deferredDataSeries
    ds_joined = df_maxRatingByLegalEntityCounterpartyTier.to_frame().merge(
        df_sum_value_where_status_ARAP, 
        right_index=True,
        left_index=True,
        how='left').merge(
        df_sum_value_where_status_ACCR, 
        right_index=True, 
        left_index=True, 
        how='left')
    
    # Grouping the beam dataframe to apply fillna method, replacing NaNs with 0 in this case 
    df_final = ds_joined.groupby(['legal_entity', 'counterparty', 'tier']).apply(lambda x: x.fillna(0))
    
    # Saving to csv, uncomment below
    #df_final.to_csv('output_file.csv')
    