<a href="https://colab.research.google.com/github/imjbmkz/sp102-data-products-supplements/blob/main/codes/week_2/Data_Processing_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Installing needed packages
The line below installs `apache-beam` package which is used for beam programming and making pipelines.

In [None]:
# %%bash
# pip install apache-beam[gcp]

Using Google Cloud Platform (GCP), importing ReadFromText and WriteToText functions of `apache-beam` package results to an error. The below code installs a missing package.

In [None]:
# pip install pyparsing==2.4.2

When using Google Colab, it has to be authenticated first before we start running pipelines in GCP.

In [None]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated.')

Authenticated.


In [None]:
project_id = 'your-project-id'
!gcloud config set project {project_id}

Updated property [core/project].


#### Importing packages
The line below imports the necessary functions and packages.

In [None]:
import apache_beam as beam
from apache_beam.io.textio import ReadFromText
from apache_beam.io.textio import WriteToText

In [None]:
## replace parameters with your values
staging_location = 'gs://your-bucket-name/staging'
temp_location = 'gs://your-bucket-name/temp'
job_name = 'dataflow-crypto'
project_id = 'your-project-id'
source_bucket= 'your-bucket-name'
target_bucket = 'your-bucket-name'
region = 'us-central1'

In [None]:
def run(project, source_bucket, target_bucket):
    import csv
    options = {
        'staging_location': staging_location,
        'temp_location': temp_location,
        'job_name': job_name,
        'project': project,
        'max_num_workers': 24,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
        'runner': 'DataflowRunner',
        'region':region
      }
    options = beam.pipeline.PipelineOptions(flags=[], **options)
    
    crypto_dataset = 'gs://{}/crypto-markets.csv'.format(source_bucket)
    processed_ds = 'gs://{}/transformed-crypto-bitcoin'.format(target_bucket)

    pipeline = beam.Pipeline(options=options)

    # 0:slug, 3:date, 5:open, 6:high, 7:low, 8:close
    rows = (
        pipeline |
            'Read from bucket' >> ReadFromText(crypto_dataset) |
            'Tokenize as csv columns' >> beam.Map(lambda line: next(csv.reader([line]))) |
            'Select columns' >> beam.Map(lambda fields: (fields[0], fields[3], fields[5], fields[6], fields[7], fields[8])) |
            'Filter bitcoin rows' >> beam.Filter(lambda row: row[0] == 'bitcoin')
        )
        
    combined = (
        rows |
            'Write to bucket' >> beam.Map(lambda slug, date, open, high, low, close: '{},{},{},{},{},{}'.format(slug, date, open, high, low, close)) |
            WriteToText(
                file_path_prefix=processed_ds,
                file_name_suffix=".csv", num_shards=2,
                shard_name_template="-SS-of-NN",
                header='slug, date, open, high, low, close')
        )
    pipeline.run()

In [None]:
if __name__ == '__main__':
    print ('Run pipeline on the cloud')
    run(project=project_id, source_bucket=source_bucket, target_bucket=target_bucket)