# Big Data Pipelines with Apache Beam on Google Cloud DataFlow
## by Karis Bisong (Cloud Big Data Engineer)

Table of contents:

- [Create a bucket on GCS](#bucket-gcs)
- [Ephemeral GCE instance for data staging](#ephmeral-gce)
- [Download from data source to ephemeral GCE](#data-download)
- [Transfer data from transient GCE to Google Cloud Storage (GCS)](#transfer-data)
- [Exploratory Data Analysis](#eda)
- [Create BigQuery Dataset](#bq-dataset)
- [Big Data Pipeline with Apache Beam](#apache-beam)

![Pipeline Overview.](big-data.png)

<a name="#bucket-gcs"></a>

# Create a bucket on GCS

In [None]:
# imports
from google.cloud import storage

In [None]:
# parameters
bucket_name = 'kbisong-blessings'

In [None]:
def create_bucket_class_location(bucket_name):
    """"Create a new bucket in specific location with storage class"""
    storage_client = storage.Client()
    
    bucket = storage_client.bucket(bucket_name)
    bucket.storage_class = "STANDARD"
    new_bucket = storage_client.create_bucket(bucket, location="us")
    
    print(
        "Created bucket {} in {} with storage class {}".format(
            new_bucket.name, new_bucket.location, new_bucket.storage_class
         )
    )
    return new_bucket

In [None]:
create_bucket_class_location(bucket_name)

Created bucket kbisong-blessings in US with storage class STANDARD


<Bucket: kbisong-blessings>

<a name="#ephmeral-gce"></a>

# Ephemeral GCE instance for data staging

* Run the code in this section on the shell. 
* If using a notebook cell on Colab you may uncomment and run as it.
* Otherwise on Jupyter notebook runnng on a Cloud VM or locally, run the code on your local terminal.

## Get Aplication Default Credentials (ADC) credentials to authenticate host VM

In [None]:
%%bash
# run the below code in a terminal
gcloud auth login

## Create GCE instance

In [None]:
%%bash
# create GCE instance      
INSTANCE_NAME="kbisong-data-staging"
ZONE=us-central1-a

gcloud compute instances create $INSTANCE_NAME \
     --image-project='deeplearning-platform-release' \
     --image-family='tf2-2-0-cu100' \
     --machine-type=e2-medium \
     --scopes='https://www.googleapis.com/auth/cloud-platform' \
     --zone=$ZONE \
     --tags=http-server \
     --boot-disk-size='300GB'

NAME                  ZONE           MACHINE_TYPE  PREEMPTIBLE  INTERNAL_IP  EXTERNAL_IP    STATUS
kbisong-data-staging  us-central1-a  e2-medium                  10.128.0.2   35.223.16.145  RUNNING


Created [https://www.googleapis.com/compute/v1/projects/qualified-root-293322/zones/us-central1-a/instances/kbisong-data-staging].
 - Disk size: '300 GB' is larger than image size: '50 GB'. You might need to resize the root repartition manually if the operating system does not support automatic resizing. See https://cloud.google.com/compute/docs/disks/add-persistent-disk#resize_pd for details.



<a name="#data-download"></a>

# Download from data source to ephemeral GCE

* The dataset is the PAMAP2 Physical Activity Monitoring Data Set from the UCI Machine Learning repository.
* The dataset is available at: [https://archive.ics.uci.edu/ml/datasets/pamap2+physical+activity+monitoring](https://archive.ics.uci.edu/ml/datasets/pamap2+physical+activity+monitoring)

### ssh to VM from the terminal
* **Note:** Replace the `zone`, `instance-name` and `project` appropriately.

In [None]:
# ssh to VM from the terminal.
gcloud beta compute ssh --zone "us-central1-a" "kbisong-data-staging" --project "qualified-root-293322"

### Download data from UCI repository to GCE

In [None]:
# Use Curl to transfer Zip file from UCI repository to the ephemeral Compute Engine
curl https://archive.ics.uci.edu/ml/machine-learning-databases/00231/PAMAP2_Dataset.zip --output PAMAP2_Dataset.zip

<a name="#transfer-data"></a>

# Transfer data from transient GCE to Google Cloud Storage (GCS)

In [None]:
# run the gsutil command on the terminal to transfer from the VM to GCS
gsutil cp -r PAMAP2_Dataset gs://kbisong-blessings/

### Clean-up staging GCE


In [None]:
%%bash
# delete GCE instance
INSTANCE_NAME="kbisong-data-staging"
ZONE=us-central1-a

gcloud compute instances delete $INSTANCE_NAME --zone=$ZONE

<a name="#eda">

# Exploratory Data Analysis

In [None]:
# imports
import random
import pandas as pd

In [None]:
def print_full(x):
    pd.set_option('display.max_rows', len(x))
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', 2000)
    pd.set_option('display.float_format', '{:20,.2f}'.format)
    pd.set_option('display.max_colwidth', None)
    print(x)
    pd.reset_option('display.max_rows')
    pd.reset_option('display.max_columns')
    pd.reset_option('display.width')
    pd.reset_option('display.float_format')
    pd.reset_option('display.max_colwidth')

### Load a sample .dat file

In [None]:
# Parameters
gcs_path = 'gs://kbisong-blessings/PAMAP2_Dataset/Optional/subject106.dat'

In [None]:
# create column names
imu_hand_list = ['_'.join(('imu_hand',str(i))) for i in range(4,21)]
imu_chest_list = ['_'.join(('imu_chest',str(i))) for i in range(21,38)]
imu_ankle_list = ['_'.join(('imu_ankle',str(i))) for i in range(38,55)]

In [None]:
columns = ['timestamp','activity_id','heart_rate']

In [None]:
columns.extend(imu_hand_list)
columns.extend(imu_chest_list)
columns.extend(imu_ankle_list)

In [None]:
len(columns)

54

In [None]:
# load the data
data = pd.read_csv(gcs_path, sep=' ', names=columns)

In [None]:
data.head()

Unnamed: 0,timestamp,activity_id,heart_rate,imu_hand_4,imu_hand_5,imu_hand_6,imu_hand_7,imu_hand_8,imu_hand_9,imu_hand_10,...,imu_ankle_45,imu_ankle_46,imu_ankle_47,imu_ankle_48,imu_ankle_49,imu_ankle_50,imu_ankle_51,imu_ankle_52,imu_ankle_53,imu_ankle_54
0,5.93,0,,28.25,-7.60142,-0.363243,6.39334,-7.57372,-0.300108,6.42,...,-0.077609,0.011736,0.00191,-46.5229,3.20015,44.7834,0.02378,-0.728393,0.35274,-0.5869
1,5.94,0,,28.25,-7.5623,-0.363199,6.43241,-7.65022,-0.435476,6.42038,...,-0.046013,0.01243,0.01933,-46.3904,3.1948,45.2093,0.023821,-0.728416,0.353048,-0.586685
2,5.95,0,,28.25,-7.60276,-0.363578,6.35483,-7.63464,-0.405356,6.4505,...,-0.124982,-0.007319,0.034692,-46.3885,2.80679,44.6425,0.024607,-0.727628,0.354943,-0.586488
3,5.96,0,,28.25,-7.51568,-0.247661,6.66364,-7.63401,-0.390201,6.51086,...,-0.108404,-0.023163,0.05113,-46.7838,3.47065,44.7821,0.024698,-0.727041,0.356218,-0.586439
4,5.97,0,,28.25,-7.56791,-0.213119,6.23938,-7.63351,-0.344884,6.52587,...,-0.047585,-0.010232,0.022264,-46.5084,2.81487,45.3512,0.024759,-0.726497,0.357154,-0.586541


### Data summaries

In [None]:
# data summaries
data.describe()

Unnamed: 0,timestamp,activity_id,heart_rate,imu_hand_4,imu_hand_5,imu_hand_6,imu_hand_7,imu_hand_8,imu_hand_9,imu_hand_10,...,imu_ankle_45,imu_ankle_46,imu_ankle_47,imu_ankle_48,imu_ankle_49,imu_ankle_50,imu_ankle_51,imu_ankle_52,imu_ankle_53,imu_ankle_54
count,129963.0,129963.0,11856.0,129897.0,129897.0,129897.0,129897.0,129897.0,129897.0,129897.0,...,129765.0,129765.0,129765.0,129765.0,129765.0,129765.0,129765.0,129765.0,129765.0,129765.0
mean,655.74,11.968668,84.04698,31.87451,-1.916314,3.728037,5.690505,-1.872209,3.752566,5.864998,...,0.020447,0.000141,0.004171,-28.199348,7.193297,5.634536,0.4241375,0.183533,0.481809,0.113022
std,375.172309,6.193361,11.64302,1.281535,4.87608,3.525859,3.941986,4.87937,3.525739,3.90346,...,0.592971,0.228879,0.634977,11.806593,16.044235,32.510547,0.2388896,0.482827,0.313118,0.39151
min,5.93,0.0,65.0,28.25,-51.8632,-33.5827,-19.134,-41.4278,-25.9358,-16.45,...,-8.55947,-2.84529,-5.50342,-80.6268,-140.46,-103.905,9.3504e-07,-0.856342,-0.588061,-0.714273
25%,330.835,10.0,73.0,31.0625,-5.48968,1.24058,2.7879,-5.43406,1.23,3.00172,...,-0.02336,-0.030204,-0.021337,-36.1465,2.20981,-24.6485,0.191998,0.033228,0.196681,-0.051621
50%,655.74,10.0,86.0,32.4375,-0.802599,3.06101,6.39486,-0.800262,3.12713,6.61682,...,0.006917,-0.004946,0.001437,-28.064,10.0369,-12.3997,0.539658,0.14185,0.682339,0.031029
75%,980.645,18.0,94.0,32.875,0.05618,6.25546,9.25581,0.145979,6.27176,9.37531,...,0.035525,0.020922,0.022447,-26.2196,17.238,38.6323,0.639795,0.689745,0.753183,0.553119
max,1305.55,19.0,109.0,33.25,22.0245,48.9305,75.4632,21.4059,49.3324,52.6572,...,9.55586,3.32042,7.31084,22.271,67.6981,104.7,0.80217,0.847623,0.866597,0.716415


In [None]:
# check for missing values
len(data) - data.count()

timestamp            0
activity_id          0
heart_rate      118107
imu_hand_4          66
imu_hand_5          66
imu_hand_6          66
imu_hand_7          66
imu_hand_8          66
imu_hand_9          66
imu_hand_10         66
imu_hand_11         66
imu_hand_12         66
imu_hand_13         66
imu_hand_14         66
imu_hand_15         66
imu_hand_16         66
imu_hand_17         66
imu_hand_18         66
imu_hand_19         66
imu_hand_20         66
imu_chest_21        80
imu_chest_22        80
imu_chest_23        80
imu_chest_24        80
imu_chest_25        80
imu_chest_26        80
imu_chest_27        80
imu_chest_28        80
imu_chest_29        80
imu_chest_30        80
imu_chest_31        80
imu_chest_32        80
imu_chest_33        80
imu_chest_34        80
imu_chest_35        80
imu_chest_36        80
imu_chest_37        80
imu_ankle_38       198
imu_ankle_39       198
imu_ankle_40       198
imu_ankle_41       198
imu_ankle_42       198
imu_ankle_43       198
imu_ankle_4

<a name="#bq-dataset">

# Big Data Pipeline with Apache Beam


Migrate dataset from Google Cloud Storage (GCS) to BigQuery. The original dataset on https://archive.ics.uci.edu/ml/datasets/pamap2+physical+activity+monitoring contains 1 data file per subject per session (protocol or optional).

The session `Protocol` contains 9 data files, while `Optional` has 5 data files. The goal of the pipeline is to merge all the data files into their respective sessions as a table in BigQuery.

The output on BigQuery contains 2 tables (`Prorocol` and `Optional`), with their associated data.

In [None]:
from google.cloud import bigquery

In [None]:
# parameters
table_name='subject106'
dataset='PAMAP'
project='qualified-root-293322'
source_input = 'gs://kbisong-blessings/PAMAP2_Dataset/Optional/subject106.dat'

In [None]:
# BQ schema field
df_schema = [bigquery.SchemaField('timestamp', 'STRING'), bigquery.SchemaField('activity_id', 'STRING'),
 bigquery.SchemaField('heart_rate','STRING'), bigquery.SchemaField('imu_hand_4','STRING'),
 bigquery.SchemaField('imu_hand_5','STRING'), bigquery.SchemaField('imu_hand_6','STRING'),
 bigquery.SchemaField('imu_hand_7','STRING'), bigquery.SchemaField('imu_hand_8','STRING'),
 bigquery.SchemaField('imu_hand_9','STRING'), bigquery.SchemaField('imu_hand_10','STRING'),
 bigquery.SchemaField('imu_hand_11','STRING'), bigquery.SchemaField('imu_hand_12','STRING'),
 bigquery.SchemaField('imu_hand_13','STRING'), bigquery.SchemaField('imu_hand_14','STRING'),
 bigquery.SchemaField('imu_hand_15','STRING'), bigquery.SchemaField('imu_hand_16','STRING'),
 bigquery.SchemaField('imu_hand_17','STRING'), bigquery.SchemaField('imu_hand_18','STRING'),
 bigquery.SchemaField('imu_hand_19','STRING'), bigquery.SchemaField('imu_hand_20','STRING'),
 bigquery.SchemaField('imu_chest_21','STRING'), bigquery.SchemaField('imu_chest_22','STRING'),
 bigquery.SchemaField('imu_chest_23','STRING'), bigquery.SchemaField('imu_chest_24','STRING'),
 bigquery.SchemaField('imu_chest_25','STRING'), bigquery.SchemaField('imu_chest_26','STRING'),
 bigquery.SchemaField('imu_chest_27','STRING'), bigquery.SchemaField('imu_chest_28','STRING'),
 bigquery.SchemaField('imu_chest_29','STRING'), bigquery.SchemaField('imu_chest_30','STRING'),
 bigquery.SchemaField('imu_chest_31','STRING'), bigquery.SchemaField('imu_chest_32','STRING'),
 bigquery.SchemaField('imu_chest_33','STRING'), bigquery.SchemaField('imu_chest_34','STRING'),
 bigquery.SchemaField('imu_chest_35','STRING'), bigquery.SchemaField('imu_chest_36','STRING'),
 bigquery.SchemaField('imu_chest_37','STRING'), bigquery.SchemaField('imu_ankle_38','STRING'),
 bigquery.SchemaField('imu_ankle_39','STRING'), bigquery.SchemaField('imu_ankle_40','STRING'),
 bigquery.SchemaField('imu_ankle_41','STRING'), bigquery.SchemaField('imu_ankle_42','STRING'),
 bigquery.SchemaField('imu_ankle_43','STRING'), bigquery.SchemaField('imu_ankle_44','STRING'),
 bigquery.SchemaField('imu_ankle_45','STRING'), bigquery.SchemaField('imu_ankle_46','STRING'),
 bigquery.SchemaField('imu_ankle_47','STRING'), bigquery.SchemaField('imu_ankle_48','STRING'),
 bigquery.SchemaField('imu_ankle_49','STRING'), bigquery.SchemaField('imu_ankle_50','STRING'),
 bigquery.SchemaField('imu_ankle_51','STRING'), bigquery.SchemaField('imu_ankle_52','STRING'),
 bigquery.SchemaField('imu_ankle_53','STRING'), bigquery.SchemaField('imu_ankle_54','STRING')]

In [None]:
def create_bq_table(table_id: str, project: str, dataset: str, schema: str):
    # construct a BigQuery client object
    client = bigquery.Client()
    table = bigquery.Table('.'.join((project, dataset, table_id)), schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(
        'Created table {}.{}.{}'.format(project, dataset, table_id)
    )

In [None]:
# create BQ table
create_bq_table(table_name, project, dataset, df_schema)

Created table qualified-root-293322.PAMAP.subject106


In [None]:
def dataflow_runner(bq_table: str, data_source: str):    
    # imports
    import re
    import csv
    import json
    import base64
    import logging
    import argparse
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.io.gcp.bigquery import BigQueryDisposition

    class DataIngestion:
    # this method parses the input csv and converts into a BigQuery-savable dictionary
        def parse_method(self, string_input):
            values = re.split(' ', re.sub('\r\n', '', re.sub(u'"', '', string_input)))
            row = dict(
                zip(('timestamp', 'activity_id', 'heart_rate', 'imu_hand_4', 'imu_hand_5', 'imu_hand_6', 'imu_hand_7', 'imu_hand_8', 'imu_hand_9',
                     'imu_hand_10', 'imu_hand_11', 'imu_hand_12', 'imu_hand_13', 'imu_hand_14', 'imu_hand_15', 'imu_hand_16', 'imu_hand_17',
                     'imu_hand_18', 'imu_hand_19', 'imu_hand_20', 'imu_chest_21', 'imu_chest_22', 'imu_chest_23', 'imu_chest_24', 'imu_chest_25',
                     'imu_chest_26', 'imu_chest_27', 'imu_chest_28', 'imu_chest_29', 'imu_chest_30', 'imu_chest_31', 'imu_chest_32', 'imu_chest_33',
                     'imu_chest_34', 'imu_chest_35', 'imu_chest_36', 'imu_chest_37', 'imu_ankle_38', 'imu_ankle_39', 'imu_ankle_40', 'imu_ankle_41',
                     'imu_ankle_42', 'imu_ankle_43', 'imu_ankle_44', 'imu_ankle_45', 'imu_ankle_46', 'imu_ankle_47', 'imu_ankle_48', 'imu_ankle_49',
                     'imu_ankle_50', 'imu_ankle_51', 'imu_ankle_52', 'imu_ankle_53', 'imu_ankle_54'),
                    values))
            return row

    def run(argv=None, save_main_session=True):
        """Main entry point; defines and runs the pipeline."""

        # parameters
        table = bq_table
        dataset='PAMAP'
        project='qualified-root-293322'
        source = data_source

        options = {
            'temp_location': 'gs://kbisong-blessings/temp',
            'job_name': 'kbisong-big-data',
            'project': 'qualified-root-293322',
            'region': 'us-central1',
            'max_num_workers': 24,
            'teardown_policy': 'TEARDOWN_ALWAYS',
            'no_save_main_session': True,
            'runner': 'DataflowRunner'
        }

        data_ingestion = DataIngestion()

        pipeline_options = PipelineOptions(flags=[], **options)
        with beam.Pipeline(options=pipeline_options) as p:
            lines = (
                p
                | 'ReadFile' >> beam.io.ReadFromText(source)
                | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s))
            )
            result = (
                lines
                | 'WriteBiqQuery' >> beam.io.WriteToBigQuery(table=table, dataset=dataset, project=project,
                                                            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=BigQueryDisposition.WRITE_APPEND,
                                                            schema='SCHEMA_AUTODETECT')
            )

    if __name__ == '__main__':
        logging.getLogger().setLevel(logging.INFO)
        run()

In [None]:
optional_source = ['gs://kbisong-blessings/PAMAP2_Dataset/Optional/subject106.dat',
         'gs://kbisong-blessings/PAMAP2_Dataset/Optional/subject101.dat',
         'gs://kbisong-blessings/PAMAP2_Dataset/Optional/subject109.dat',
         'gs://kbisong-blessings/PAMAP2_Dataset/Optional/subject108.dat',
         'gs://kbisong-blessings/PAMAP2_Dataset/Optional/subject105.dat']

protocol_source = ['gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject101.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject102.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject103.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject104.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject105.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject106.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject107.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject108.dat',
                  'gs://kbisong-blessings/PAMAP2_Dataset/Protocol/subject109.dat']

table = {'Optional':  optional_source, 'Protocol': protocol_source}

In [None]:
for key, value in table.items():
    for source in value:
        dataflow_runner(key, source)

INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/opt/conda/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmp6mvmtkpb', 'apache-beam==2.24.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/opt/conda/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmp6mvmtkpb', 'apache-beam==2.24.0', '--no-deps', '--only-binary', ':all:', '--python-version', '37', '--implementation', 'cp', '--abi', 'cp37m', '--platform', 'manylinux1_x86_64']
INFO:apache_beam.runners.portability.stager:Staging binary distribution of the SDK from PyPI: apache_beam-2.24.0-cp37-cp37m-manylinux1_x86_64.whl
INFO:root:Using Python SDK docker image: apach

## End