In [1]:
import os, datetime
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText

In [2]:
"\\Hello".replace("\\","")

'Hello'

In [3]:
class NameFormatFn(beam.DoFn):
    def process(self, element):
        record = element
        name = record.get('Candidate')
        name = name.upper()
        name = name.replace("\"","")
        name = name.replace("\\","")
        name = name.replace(', JR.', '')
        name = name.replace(', SR.', '')
        name = name.replace(', II', '')
        name = name.replace(', III', '')
        name = name.replace(', JR', '')
        name = name.replace(', SR', '')
        name = name.replace(' JR', '')
        name = name.replace(' SR', '')
        name = name.replace(' II', '')
        name = name.replace(' III', '')
        name = name.replace('.','')
        
        if "," in name:
            name = name.split()
            record['Candidate'] = name[0] + " " + name[1]
        else:
            name = name.split()
            if len(name) == 1:
                record['Candidate'] = name[0]
            else:
                record['Candidate'] = name[-1] + ', ' + name[0]
                
        return [record]

In [4]:
class RunsNamePreGroupFn(beam.DoFn):
    def process(self, element):
        record = element
        name = record.pop('Candidate')
        
        return [(name, record)]
    
class CandNamePreGroupFn(beam.DoFn):
    def process(self, element):
        record = element
        name = record.pop('Name')
        
        return[(name, record)]

In [5]:
class RemoveEmptyRunsFn(beam.DoFn):
    def process(self, element):
        name, record = element
        
        if len(record['Runs']) == 0 or len(record['Cand']) == 0:
            pass
        
        else:
            return [element]

In [12]:
class CreatePreFinalRecordsFn(beam.DoFn):
    def process(self, element):
        name, record_dict = element
        record_lst = []
        runsKey_lst = []
        
        for x in record_dict['Runs']:
            for y in record_dict['Cand']:
                cand_dict = {'Election_ID':x['Election_ID'],'Candidate_Votes':x['Candidate_Votes']}
                cand_dict['Candidate_Label'] = cand_dict['Election_ID'][:4] + y['ID']
                runsKey = cand_dict['Candidate_Label'] + cand_dict['Election_ID'][4:]
                cand_dict['Runs_ID'] = cand_dict['Candidate_Label'] + cand_dict['Election_ID'][4:]
                
                if runsKey not in runsKey_lst:
                    cand_dict['Candidate_Label'] + cand_dict['Election_ID'][4:]
                    record_lst.append(cand_dict)
            
        return record_lst

In [7]:
PROJECT_ID = 'sound-cider-252823'

# Project ID is needed for BigQuery data source, even for local execution.
options = {
    'project': PROJECT_ID
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)

In [13]:
with beam.Pipeline('DirectRunner', options = opts) as p:
    
    #Because of the time limits, we only extract the first 100 rows of data.
    query_results = p | 'Read from BigQuery for txt' >> beam.io.Read(beam.io.BigQuerySource(query = 'SELECT * FROM MIT_modeled.Runs'))
    cand_query_results = p | 'Read Candidates_Beam_DF_Jupyter from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query = 'SELECT Name, ID from fec_modeled.Candidates_Beam_DF_Jupyter'))

    #write PCollection to log file
    #query_results | 'Write to input.txt' >> WriteToText('input.txt')
                                                                                             
    #cand_query_results | 'Write cand input to txt' >> WriteToText('cand_input.txt')
    
    #perform name standardization by calling the NameFormat ParDo
    new_pcoll = query_results | 'Perform name standardization' >> beam.ParDo(NameFormatFn())
    
    #pregrouping for runs
    runs_name_pcoll = new_pcoll | 'Runs pregroup processing' >> beam.ParDo(RunsNamePreGroupFn())
    
    #runs_name_pcoll | 'Write runs pregroup process results to txt' >> WriteToText('runs_name_pregroup_results.txt')
    
    #pregrouping for candidates table
    cand_name_pcoll = cand_query_results | 'Cand pregroup processing' >> beam.ParDo(CandNamePreGroupFn())
    
    #cand_name_pcoll | 'Write cand pregroup process results to txt' >> WriteToText('cand_name_pregroup_results.txt')
    
    group_pcoll = {'Runs':runs_name_pcoll,'Cand':cand_name_pcoll} | 'Grouping' >> beam.CoGroupByKey()
    
    group_pcoll | "Write grouped results to txt" >> WriteToText('grouped_results.txt')
    
    cleaned_group_pcoll = group_pcoll | "Remove empty runs record" >> beam.ParDo(RemoveEmptyRunsFn())
    
    cleaned_group_pcoll | "Write cleaned group to txt" >> WriteToText('cleaned_grouped_results.txt')
    
    pre_final_pcoll = cleaned_group_pcoll | "Create near final records" >> beam.ParDo(CreatePreFinalRecordsFn())
    
    pre_final_pcoll | "Write pre final records to txt" >> WriteToText('pre_final_results.txt')


    

#     qualified_table_name = PROJECT_ID + ':MIT_modeled.Runs_Beam_Jupyter'
#     #Transaction_Type:STRING, Date:DATE, Amount:INTEGER, Payee_ID:STRING, Contribution_ID:STRING, Year:INTEGER, Committee_Label:STRING, Candidate_Label:STRING
#     # change the column "Date" in to type date
#     table_schema = 'Candidate:STRING, Candidate_Votes:INTEGER, Election_ID:STRING'
    
#     # write the output results as a table in BigQuery
#     new_pcoll | 'Write to BigQuery' >> beam.io.Write(beam.io.BigQuerySink(qualified_table_name,
#                                                     schema=table_schema,  
#                                                     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
#                                                     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))


