# GroupBy

Single Key, Multiple Keys

#### Create a subset and group by, aggregating the row indexes
We want to maintain the original indices of the data coming in if we can.  

In [1]:
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

import os

# Configure some test data
test_file_path = os.path.join(os.getcwd(), 'assets', 'data', 'usbe_students.csv')
output_file_path = os.path.join(os.getcwd(), 'output_test_file.csv')


# Define a subset to work with
agg_set = ['FIRST_NAME', 'LAST_NAME']  # Can be any number of valid columns in dataset

## Via Pure CombineFn, Pipeline Operations

In [23]:
from utils.load import (
    make_schema_from_csv,
    make_csv_coder,
)
from assets.mapping import is_mapped

class CompositeKey(beam.DoFn):
    def __init__(self, agg_keys: list = []):
        self.agg_keys = agg_keys
        
    def process(self, row: dict):
        row['agg_key'] = {'-'.join([row[k] for k in self.agg_keys])}
        yield row
    

# Convert over to mapped column names
mapped_set = [is_mapped(col)[0] for col in agg_set]
    

with beam.Pipeline() as p:
    schema = make_schema_from_csv(test_file_path)
    csv_coder = make_csv_coder(schema)
    
    grouped = (
        p
        | 'ReadFromText' >> beam.io.ReadFromText(
            test_file_path,
            skip_header_lines=1
            )
        | 'ParseCSV' >> beam.Map(csv_coder)
        | 'Create Composite Key' >> beam.ParDo(CompositeKey(agg_keys=mapped_set))
        | 'GroupBy Keys' >> beam.GroupBy(lambda r: r['agg_key'])  # Now grouped by composite key
            # If there was a row-index column, then you can aggregate the columns via CombinePerKey
        | 'Print' >> beam.Map(print)
    )

({'Genora-Zegarra'}, [{'birth_date_pool': '6/9/1256', 'FIRST_ENTERED_US': '07-MAR-16 12.00.00.000000000 AM', 'first_name_pool': 'Genora', 'gender_pool': 'f', 'last_name_pool': 'Zegarra', 'middle_name_pool': 'Rhino', 'ssid_pool': '0x218b2ea30x43165d46', 'student_id_pool': '666165', 'id': '46394', 'agg_key': {'Genora-Zegarra'}}])
({'Laramee-Vela Garcia'}, [{'birth_date_pool': '9/5/1287', 'FIRST_ENTERED_US': '13-AUG-12 12.00.00.000000000 AM', 'first_name_pool': 'Laramee', 'gender_pool': 'm', 'last_name_pool': 'Vela Garcia', 'middle_name_pool': 'Legacy-Martin', 'ssid_pool': '0x2156f0e80x42ade1d0', 'student_id_pool': '976990', 'id': '35263', 'agg_key': {'Laramee-Vela Garcia'}}])
({'Zelma-Garlitz'}, [{'birth_date_pool': '3/11/1507', 'FIRST_ENTERED_US': '06-OCT-16 12.00.00.000000000 AM', 'first_name_pool': 'Zelma', 'gender_pool': 'n', 'last_name_pool': 'Garlitz', 'middle_name_pool': 'Katempa', 'ssid_pool': '0x2981558d0x5302ab1a', 'student_id_pool': '225275', 'id': '148149', 'agg_key': {'Zelma

({'Mauna-Sackley'}, [{'birth_date_pool': '7/10/1336', 'FIRST_ENTERED_US': '25-AUG-02 12.00.00.000000000 AM', 'first_name_pool': 'Mauna', 'gender_pool': 'm', 'last_name_pool': 'Sackley', 'middle_name_pool': 'Heather Lynn', 'ssid_pool': '0x387b2c100x70f65820', 'student_id_pool': '538263', 'id': '101840', 'agg_key': {'Mauna-Sackley'}}])
({'Haylen-Reyes-Galdamez'}, [{'birth_date_pool': '6/13/1346', 'FIRST_ENTERED_US': '15-DEC-00 12.00.00.000000000 AM', 'first_name_pool': 'Haylen', 'gender_pool': 'n', 'last_name_pool': 'Reyes-Galdamez', 'middle_name_pool': 'Vue', 'ssid_pool': '0x3ad923500x75b246a0', 'student_id_pool': '394590', 'id': '49827', 'agg_key': {'Haylen-Reyes-Galdamez'}}])
({'Gideon-Guthrie'}, [{'birth_date_pool': '6/27/2086', 'FIRST_ENTERED_US': '04-SEP-11 12.00.00.000000000 AM', 'first_name_pool': 'Gideon', 'gender_pool': 'm', 'last_name_pool': 'Guthrie', 'middle_name_pool': 'Emilliano King', 'ssid_pool': '0x215f7f630x42befec6', 'student_id_pool': '357423', 'id': '23424', 'agg_ke

({'Dodger-Murshid'}, [{'birth_date_pool': '9/14/2867', 'FIRST_ENTERED_US': '09-FEB-06 12.00.00.000000000 AM', 'first_name_pool': 'Dodger', 'gender_pool': 'f', 'last_name_pool': 'Murshid', 'middle_name_pool': 'Ava May', 'ssid_pool': '0x28bba73e0x51774e7c', 'student_id_pool': '689910', 'id': '27019', 'agg_key': {'Dodger-Murshid'}}])
({'Clarence-Luciano'}, [{'birth_date_pool': '7/2/1352', 'FIRST_ENTERED_US': '25-JUL-94 12.00.00.000000000 AM', 'first_name_pool': 'Clarence', 'gender_pool': 'm', 'last_name_pool': 'Luciano', 'middle_name_pool': 'Monica Astorga', 'ssid_pool': '0x1025743a0x204ae874', 'student_id_pool': '290042', 'id': '66830', 'agg_key': {'Clarence-Luciano'}}])
({'Vilate-Huseman'}, [{'birth_date_pool': '7/24/2146', 'FIRST_ENTERED_US': '19-AUG-04 12.00.00.000000000 AM', 'first_name_pool': 'Vilate', 'gender_pool': 'n', 'last_name_pool': 'Huseman', 'middle_name_pool': 'Maria Del Carmen', 'ssid_pool': '0x1160b56a0x22c16ad4', 'student_id_pool': '372643', 'id': '81986', 'agg_key': {'

({'Dallas-Koskie'}, [{'birth_date_pool': '4/7/1874', 'FIRST_ENTERED_US': '28-FEB-06 12.00.00.000000000 AM', 'first_name_pool': 'Dallas', 'gender_pool': 'm', 'last_name_pool': 'Koskie', 'middle_name_pool': 'Kemalia Kaopahele Analani Jean', 'ssid_pool': '0x24aa14400x49542880', 'student_id_pool': '881559', 'id': '63545', 'agg_key': {'Dallas-Koskie'}}])
({'Jaklynn-Bergholtz'}, [{'birth_date_pool': '7/13/1359', 'FIRST_ENTERED_US': '15-NOV-00 12.00.00.000000000 AM', 'first_name_pool': 'Jaklynn', 'gender_pool': 'f', 'last_name_pool': 'Bergholtz', 'middle_name_pool': "Joel Fa'amalosi", 'ssid_pool': '0x26075b490x4c0eb692', 'student_id_pool': '783406', 'id': '25167', 'agg_key': {'Jaklynn-Bergholtz'}}])
({'Coultyn-Vanekelenburg'}, [{'birth_date_pool': '6/18/2243', 'FIRST_ENTERED_US': '26-JAN-09 12.00.00.000000000 AM', 'first_name_pool': 'Coultyn', 'gender_pool': 'n', 'last_name_pool': 'Vanekelenburg', 'middle_name_pool': 'Norlin', 'ssid_pool': '0x28338d2a0x50671a54', 'student_id_pool': '280007004

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



## Via DataFrames API
Beam DataFrames offer more table-like behaviors.  Some functionality hasn't been implemented yet (apply won't work)

In [27]:
# Build the pipeline
with beam.Pipeline() as p:
    # Use the runner to load data and put it into a dataframe (there are pcollection to frame commands as well)
    df = p | read_csv(test_file_path)
    
    aggregated = df[agg_set]\
        .groupby(agg_set)\
        .count()
#         .apply(lambda x: x.index.to_list())
    
    # Output the frame directly (can go to csv or BigQuery)
    aggregated.to_csv(output_file_path)  # Needs more kwargs to be useful, but output is possible already

    # Convert back to PCollection for more stuff if needed - in general, don't do this. Very slow.
#     agg_pc = to_pcollection(aggregated)  