### Input File Reading

In [2]:
import apache_beam as beam 

with beam.Pipeline() as pipeline:

    data = (
        pipeline
        | 'Read CSV file' >> beam.io.ReadFromText('gs://cloud-samples-data/bigquery/sample-transactions/transactions.csv')
    )

    data | 'Print data' >> beam.Map(print)

 Traceback for above exception (most recent call last):
  File "/Users/Shehryar/Documents/Virgin-Media-O2-Data-Engineer-Tech-Test/venv/lib/python3.9/site-packages/apache_beam/utils/retry.py", line 298, in wrapper
    return fun(*args, **kwargs)
  File "/Users/Shehryar/Documents/Virgin-Media-O2-Data-Engineer-Tech-Test/venv/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py", line 175, in _get_credentials_with_retrys
    credentials, _ = google.auth.default(
  File "/Users/Shehryar/Documents/Virgin-Media-O2-Data-Engineer-Tech-Test/venv/lib/python3.9/site-packages/google/auth/_default.py", line 691, in default
    raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)

 Traceback for above exception (most recent call last):
  File "/Users/Shehryar/Documents/Virgin-Media-O2-Data-Engineer-Tech-Test/venv/lib/python3.9/site-packages/apache_beam/utils/retry.py", line 298, in wrapper
    return fun(*args, **kwargs)
  File "/Users/Shehryar/Documents/Virgin-Media-O2-Da

timestamp,origin,destination,transaction_amount
2009-01-09 02:54:25 UTC,wallet00000e719adfeaa64b5a,wallet00001866cb7e0f09a890,1021101.99
2017-01-01 04:22:23 UTC,wallet00000e719adfeaa64b5a,wallet00001e494c12b3083634,19.95
2017-03-18 14:09:16 UTC,wallet00001866cb7e0f09a890,wallet00001e494c12b3083634,2102.22
2017-03-18 14:10:44 UTC,wallet00001866cb7e0f09a890,wallet00000e719adfeaa64b5a,1.00030
2017-08-31 17:00:09 UTC,wallet00001e494c12b3083634,wallet00005f83196ec58e4ffe,13700000023.08
2018-02-27 16:04:11 UTC,wallet00005f83196ec58e4ffe,wallet00001866cb7e0f09a890,129.12


### Filter Transactions with Amount Greater than 20

In [3]:
import apache_beam as beam 

class FilterTransactionsGreaterThan20(beam.DoFn):
    def process(self, element):
        transaction_amount = float(element.split(',')[3])  # Transaction amount is the 4th column in csv 
        if transaction_amount > 20:
            yield element

with beam.Pipeline() as pipeline:
    data = (
        pipeline
        | 'Read CSV file' >> beam.io.ReadFromText('gs://cloud-samples-data/bigquery/sample-transactions/transactions.csv', skip_header_lines=1)
        | 'Filter transactions' >> beam.ParDo(FilterTransactionsGreaterThan20())
    )

    data | 'Print filtered data' >> beam.Map(print)


2009-01-09 02:54:25 UTC,wallet00000e719adfeaa64b5a,wallet00001866cb7e0f09a890,1021101.99
2017-03-18 14:09:16 UTC,wallet00001866cb7e0f09a890,wallet00001e494c12b3083634,2102.22
2017-08-31 17:00:09 UTC,wallet00001e494c12b3083634,wallet00005f83196ec58e4ffe,13700000023.08
2018-02-27 16:04:11 UTC,wallet00005f83196ec58e4ffe,wallet00001866cb7e0f09a890,129.12


### Exclude Transactions before 2010
#### I opted not to exclude amounts less than 20 since the instructions were vague, and including them provided no data for aggregation.



In [4]:
import apache_beam as beam 
from datetime import datetime

class CheckTimestampYear(beam.DoFn):
    def process(self, element):
        timestamp_str = element.split(',')[0]  # Timestamp is the 1st column
        timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S %Z')
        if timestamp.year >= 2010:
            yield element


with beam.Pipeline() as pipeline:
    data = (
        pipeline
        | 'Read CSV file' >> beam.io.ReadFromText('gs://cloud-samples-data/bigquery/sample-transactions/transactions.csv',  skip_header_lines=1)
        | 'Filter transactions for year' >> beam.ParDo(CheckTimestampYear())
    )

    
    data | 'Print data' >> beam.Map(print)


2017-01-01 04:22:23 UTC,wallet00000e719adfeaa64b5a,wallet00001e494c12b3083634,19.95
2017-03-18 14:09:16 UTC,wallet00001866cb7e0f09a890,wallet00001e494c12b3083634,2102.22
2017-03-18 14:10:44 UTC,wallet00001866cb7e0f09a890,wallet00000e719adfeaa64b5a,1.00030
2017-08-31 17:00:09 UTC,wallet00001e494c12b3083634,wallet00005f83196ec58e4ffe,13700000023.08
2018-02-27 16:04:11 UTC,wallet00005f83196ec58e4ffe,wallet00001866cb7e0f09a890,129.12


### Aggregate Total by Date


In [5]:
import apache_beam as beam 
from datetime import datetime

class CheckTimestampYear(beam.DoFn):
    def process(self, element):
        timestamp_str = element.split(',')[0]  
        timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S %Z')
        if timestamp.year >= 2010:
            yield element

class ExtractDateAmount(beam.DoFn):
    def process(self, element):
        timestamp_str, _, _, amount_str = element.split(',')
        timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S %Z')
        amount = float(amount_str)
        date = timestamp.strftime('%Y-%m-%d')
        yield (date, amount)

with beam.Pipeline() as pipeline:
    data = (
        pipeline
        | 'Read CSV file' >> beam.io.ReadFromText('gs://cloud-samples-data/bigquery/sample-transactions/transactions.csv',  skip_header_lines=1)
        | 'Filter transactions for year' >> beam.ParDo(CheckTimestampYear())
        | 'Extract date and amount' >> beam.ParDo(ExtractDateAmount())
        | 'Sum amounts by date' >> beam.CombinePerKey(sum)  #Group By  
    )


    data | 'Print data' >> beam.Map(print)


('2017-01-01', 19.95)
('2017-03-18', 2103.2203)
('2017-08-31', 13700000023.08)
('2018-02-27', 129.12)


### Save Output to 'output' Folder


In [2]:
import apache_beam as beam 
from datetime import datetime


class CheckTimestampYear(beam.DoFn):
    def process(self, element):
        timestamp_str = element.split(',')[0]  
        timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S %Z')
        if timestamp.year >= 2010:
            yield element

class ExtractDateAmount(beam.DoFn):
    def process(self, element):
        timestamp_str, _, _, amount_str = element.split(',')
        timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S %Z')
        amount = float(amount_str)
        date = timestamp.strftime('%Y-%m-%d')
        yield (date, amount)

class FormatOutput(beam.DoFn):
    def process(self, element):
        date, total_amount = element
        yield f'{date}, {total_amount}'

with beam.Pipeline() as pipeline:
    data = (
        pipeline
        | 'Read CSV file' >> beam.io.ReadFromText('/Users/Shehryar/Downloads/transactions.csv',  skip_header_lines=1)
        | 'Filter transactions for year' >> beam.ParDo(CheckTimestampYear())
        | 'Extract date and amount' >> beam.ParDo(ExtractDateAmount())
        | 'Sum amounts by date' >> beam.CombinePerKey(sum)  #GROUP BY 
        | 'Format output' >> beam.ParDo(FormatOutput())
        | 'Write to JSONL file' >> beam.io.WriteToText('output/results2', file_name_suffix='.jsonl.gz', compression_type='gzip', header='date, total_amount')
    )


