<a href="https://colab.research.google.com/github/dangro/gcp-playground/blob/master/beam/Simple_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Getting started with Apache Beam

In this notebook I create a very simple Apache Beam pipeline that will read and write from and to Google Cloud Storage.   
This pipeline is not meant to execute in Google Cloud Platform and will use a local runner instead.

For a sufficiently large data set or a complex pipeline we want to take advantage of Cloud Dataflow.

To get started you will need a GCP project and a GCS bucket.  
Please updload a text file to the GCS bucket to use in this pipeline.

###Install dependencies

Will be using Google Cloud Storage to read and write data used in the pipeline we need to installl package ```apache-beam[gcp]```.

In [0]:
!pip install apache-beam[gcp]

###Configure the ```gcloud``` CLI
If you haven't already, create a project in Google Cloud Platform.

In the cell below replace:
1. ```GCP_PROJECT_NAME``` with the name of your project.
2. ```GCP_BUCKET_NAME``` with the name of the bucket to read from and write data to in this pipeline.

We will use this later on to get output of the pipeline.

In [0]:
%env PROJECT_NAME=GCP_PROJECT_NAME
%env BUCKET_NAME=GCP_BUCKET_NAME

In [0]:
!gcloud config set project $PROJECT_NAME

In [0]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from google.colab import auth

Authenticate the notebook to use Google Cloud Storage

In [0]:
auth.authenticate_user()

###Set the name of the GCS bucket

If you haven't already, create a bucket in Google Cloud Storage. This bucket may be a regional bucket.

In the cell below replace:
1. ```GCP_BUCKET_NAME``` with the name of your bucket. This will be used by the pipeline to read and write data.
2. ```INPUT_FILE_NAME``` with the name of the file you uploaded to the bucket.
3. Optionally, change the value of ```SPLIT_ON_SEQUENCE``` with a sequence of characters to split the input.

In [0]:
BUCKET_NAME = "GCP_BUCKET_NAME"
INPUT_FILE_IN_BUCKET = "INPUT_FILE_NAME"
SPLIT_ON_SEQUENCE = " "

##Helper functions to create a pipeline

In [0]:
def make_gcs_path(bucket, file):
  """Helper function that returns a GCS path"""
  return "gs://" + bucket + "/" + file

In [0]:
class SplitString(beam.DoFn):
  """Splits input on the character(s) defined in SPLIT_ON_SEQUENCE"""
  def process(self, element):
    yield element.split(SPLIT_ON_SEQUENCE)

In [0]:
def make_pipeline(input_bucket, output_bucket):
  """Returns an Apache Beam pipeline"""
  pipeline = beam.Pipeline()
  text = (pipeline
      | 'read' >> ReadFromText(input_bucket)
      | 'split' >> beam.ParDo(SplitString())
      | 'write' >> WriteToText(output_bucket))
  return pipeline

The function ```make_pipeline``` creates a Beam Pipeline that loads data from GCS, applies transforms to the data, and finally writes the output back to GCS.
1. [ReadFromText](https://beam.apache.org/releases/pydoc/2.11.0/apache_beam.io.textio.html?highlight=readfromtext#apache_beam.io.textio.ReadFromText): The result is a PCollection, that is, an object that references the data that will be processed by the pipeline.
2. [ParDo](https://beam.apache.org/releases/pydoc/2.11.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo): ParDo is a special kind of transform that accepts objects that extend DoFn such as ```SplitString```.
3. [WriteToText](https://beam.apache.org/releases/pydoc/2.11.0/apache_beam.io.textio.html?highlight=readfromtext#apache_beam.io.textio.WriteToText): As the pipeline finishes processing data the output is written to one or more files.

In [0]:
def run_pipeline(argv=None):
  """Sets up and runs an Apache Beam Pipeline"""
  INPUT_DATA = make_gcs_path(BUCKET_NAME, INPUT_FILE_IN_BUCKET)
  OUTPUT_DATA = make_gcs_path(BUCKET_NAME, "output")
  
  pipeline = make_pipeline(INPUT_DATA, OUTPUT_DATA)
  result = pipeline.run()
  result.wait_until_finish()

##Run the pipeline

In [0]:
run_pipeline()

##Explore the pipeline output
Now that the pipeline has run and finished we may download the contents of the bucket.

In [0]:
!gsutil cp gs://$BUCKET_NAME/output-* .

In [0]:
!cat output-*