#Command to trigger dataflow

python apache_beam_batch_job_gcs_to_bq.py \
        --inpu 'gs://test-project-dcp/sample_data/customer_purchasing_behaviors.csv'\
        --output_table 'lexical-cider-440507-u0:test_dcp.test_table'\
        --project 'lexical-cider-440507-u0'\
        --temp_location 'gs://test-project-dcp/temp'\
        --staging_location 'gs://test-project-dcp/temp'\
        --region 'us-central1'\
        --runner 'DataflowRunner'


In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import argparse

class convert_dict(beam.DoFn):
    def process(self, element):
        temp_dict = {}
        temp_dict['user_id']=element[0]
        temp_dict['age']=element[1]
        temp_dict['annual_income']=element[2]
        temp_dict['purchase_amount']=element[3]
        temp_dict['loyalty_score']=element[4]
        temp_dict['region']=element[5]
        temp_dict['purchase_frequency']=element[6]
        yield temp_dict

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', default='gs://test-project-dcp/sample_data/customer_purchasing_behaviors.csv', help='Input file to process.')
    parser.add_argument('--output_table', dest='output_table', default='lexical-cider-440507-u0:test_dcp.test_table', help='output table to process.')
    
    known_args, pipeline_args = parser.parse_known_args()
    options=PipelineOptions(pipeline_args)

    # Create the pipeline
    with beam.Pipeline(options=options) as p:
        # Read data from GCS
        data = (
            p
            | 'ReadFromGCS' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
            | 'split_map' >> beam.Map(lambda x: x.split(','))
            | 'filter_values' >> beam.Filter(lambda x: int(x[1]) > 20)
            | 'convert_dict' >> beam.ParDo(convert_dict()) # Parse the CSV into dictionaries
            #| 'print' >> beam.Map(print)
        )

        # Write the parsed data to BigQuery
        data | 'WriteToBigQuery' >> WriteToBigQuery(
            known_args.output_table,  # BigQuery table name
            schema='user_id:INTEGER,age:INTEGER,annual_income:FLOAT,purchase_amount:FLOAT,loyalty_score:FLOAT,region:STRING,purchase_frequency:INTEGER',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, # Let BigQuery auto-detect the schema
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE  # Append data to BigQuery
        )

if __name__ == '__main__':
    run()
