### Imports for sections I - III

In [72]:
import io
import os
import re
import random
import requests
from zipfile import ZipFile
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from google.cloud import storage

# I.  Introduction to Staging with Google Cloud

Buckets are the basic containers that hold your data. Everything that you store in Cloud Storage must be contained in a bucket. You can use buckets to organize your data and control access to your data.

## I.a The Google Cloud Storage Client

The `storage.Client` object uses your default project. Alternatively, you can specify a project in the `Client` constructor. Run the following to create a client with your default project:

In [26]:
client = storage.Client()

Lets create a bucket with a globally unique name.  For more information about naming buckets, see [Bucket name requirements](https://cloud.google.com/storage/docs/naming#requirements).

In [None]:
bucket_name = "end-to-end-analysis-demo-i"  # Must be globally unique!
bucket = client.create_bucket(bucket_name)

## I.b Upload a local file to a bucket

Objects are the individual pieces of data that you store in Cloud Storage. Objects are referred to as `blobs` in the Python client library. There is no limit on the number of objects that you can create in a bucket.  An object's name is treated as a piece of object metadata in Cloud Storage.

The client library provides a variety 'upload' methods depending on the type and nature of your data.  Lets upload some bytes into a file that sits in GCS.  You can also upload by file as well 

In [4]:
msg = b'Hello world!'
blob_name = 'hello-world.txt'
blob = bucket.blob(blob_name)
blob.upload_from_string(msg)

## I.c Download a blob to a local directory

When downloading to a local directory, the Cloud Storage client library `download_to_filename` allows you to download a `blob` object directly to a file without the need to instantiate a file-like object or a byte stream - download (natively) in a single line.  There are, of course, methods that grant you more granular control depending on your file needs.

In [8]:
blob = bucket.get_blob(blob_name)
blob.download_to_filename(blob_name)

## I.d Deleting a blob from Cloud Storage

Let's clean up the blob we made.  Since we're going to check only for .txt files, we need to remove this .txt file to make sure this test file doesn't interfere with our workflow.

In [30]:
blob = bucket.get_blob(blob_name)
blob.delete() # And that's it.

# II. The Dataset: SSA (USA) Baby Names 

* https://www.ssa.gov/oact/babynames/limits.html
* https://www.ssa.gov/oact/babynames/background.html

To provide popular names and maintain an acceptable performance level on our servers, we provide only the top 1000 names through our forms. However, we provide almost all names for researchers interested in naming trends.

To safeguard privacy, we exclude from these files certain names that would indicate, or would allow the ability to determine, names with fewer than 5 occurrences in any geographic area. We provide these data on both a national and state-specific basis, in two separate collections of files, each zipped into a single file. The format of the data in the three file collections is described in a "readme" file contained in the respective zip files.

* National data: https://www.ssa.gov/oact/babynames/names.zip
* State-specific adata: https://www.ssa.gov/oact/babynames/state/namesbystate.zip
* Territory-specific data: https://www.ssa.gov/oact/babynames/territory/namesbyterritory.zip


## II.a  Prestaging

Google Cloud's Storage client is easy and accessible.  Before we stage our files directly from the SSA's website, we'll collect and pre-process all of the information we need (such as URLs, file names, and folders).  

One of the best parts of the client library is that once a bucket has been represented as a python object, we can easily re-use it within the rest of our workflow.  

In [20]:
usa_names_client = storage.Client()
usa_names_url = 'https://www.ssa.gov/oact/babynames/names.zip'
usa_names_path, usa_names_file = os.path.split(usa_names_url)
usa_names_bucket = usa_names_client.bucket('end-to-end-analysis-demo-i')
usa_names_folder = 'usa_names'

print('URL:', usa_names_url)
print('URL path:', usa_names_path)
print('File:', usa_names_file)
print('Bucket:', usa_names_bucket.name)
print('Folder:', usa_names_folder)

URL: https://www.ssa.gov/oact/babynames/names.zip
URL path: https://www.ssa.gov/oact/babynames
File: names.zip
Bucket: end-to-end-analysis-demo-i
Folder: usa_names


## II.b  Staging

With our information ready for staging, we're going to process the data in-memory (and without writing it to the local disk) and stage directly to Google Cloud Storage.  The dataset is composed of 140 text files that are comma delimited lines representing a name, a count, and the recorded gender - we're going to inject the year as a feature into each row before we upload the files to GCS.  

The Source data is compressed into a zip archive.  The data will be extracted in memory, the files will be iterated over, manipulated, and then staged on to GCS.   

In [24]:
with requests.get(usa_names_url) as resp:
    resp_bytes = resp.content
    archive_stream = io.BytesIO(resp_bytes)
    
    with ZipFile(archive_stream) as archive:
        files = archive.namelist()
        for file in files:
            if not file.endswith('.txt'):
                continue
            year = file[3:7]
            file_stream = io.BytesIO()
            with archive.open(file) as text:
                for line in text:
                    updated_line = line[:-2] + bytes(f',{year}\r\n'.encode())
                    file_stream.write(updated_line)
            usa_names_blob_name = os.path.join(usa_names_folder, file)
            usa_names_blob = usa_names_bucket.blob(usa_names_blob_name)
            usa_names_blob.upload_from_string(file_stream.getvalue())

## II.c Checking the Work

The files have been staged to GCP.  Let's sample a handful of files to make sure the upload went as planned.  

In [59]:
files = [blob.name for blob in usa_names_bucket.list_blobs() if blob.name.endswith('.txt')]
samples = random.sample(files, 3)
for file in samples:
    usa_names_blob = usa_names_bucket.blob(file)
    usa_names_data = usa_names_blob.download_as_string().decode()
    print('File:', file)
    for index, line in enumerate(usa_names_data.split()):
        if index >= 5:
            break
        print('----', 'Sample row:', line) 

File: usa_names/yob1969.txt
---- Sample row: Lisa,F,45029,1969
---- Sample row: Michelle,F,34320,1969
---- Sample row: Jennifer,F,33702,1969
---- Sample row: Kimberly,F,33079,1969
---- Sample row: Melissa,F,23022,1969
File: usa_names/yob1996.txt
---- Sample row: Emily,F,25151,1996
---- Sample row: Jessica,F,24201,1996
---- Sample row: Ashley,F,23677,1996
---- Sample row: Sarah,F,21040,1996
---- Sample row: Samantha,F,20549,1996
File: usa_names/yob1968.txt
---- Sample row: Lisa,F,49532,1968
---- Sample row: Michelle,F,33223,1968
---- Sample row: Kimberly,F,31908,1968
---- Sample row: Jennifer,F,26854,1968
---- Sample row: Melissa,F,21726,1968


# III.  Serverless ETL with Cloud DataFlow

Cloud Dataflow is a managed service for executing a wide variety of data processing patterns.  It is a serverless computation engine for processing ETL, jobs are written in open source Apache Beam.  Dataflow excels at embarrassingly parallel processing.  

## III.a Introduction to Cloud Dataflow and Apache Beam

Beam is the third spiritual successor to MapReduce, its open source, it addresses unifying the semantics of batch and streaming, and its designed to be portable.  

Apache Beam’s SDK is great for teams - it provides a level of standardization in ETL that most programming languages can’t.  Instead of allowing your team to ‘roll-their-own’, Beam is batteries included so your team can easily perform processing and preprocessing.  

Let's go through some basic examples before we jump into serverlessly processing 140 separate files residing on GCS in parallel.  

In [68]:
# 0, 1, 2, 3, 4
nums = [x for x in range(5)]  

with beam.Pipeline(InteractiveRunner()) as p: 
    squares = (
        p
        | beam.Create(nums)  # Creates a 'pcollection' based of a python collection
        | beam.Map(lambda x: x**2)
        | beam.Map(print) # Applies the print function to each element
    )

0
1
4
9
16


## III.b Interactive Beam

Using the Apache Beam interactive runner with JupyterLab notebooks lets you iteratively develop pipelines, inspect your pipeline graph, and parse individual PCollections in a read-eval-print-loop (REPL) workflow. 

Changing the runner to `InteractiveRunner` is quite easy. `InteractiveRunner` has built-in functionalities like showing the pipeline graph or seeing the output without the need of `print`. You also don't need to use `p.run()`, since it's contained in `ib.show()`

In [69]:
with beam.Pipeline(InteractiveRunner()) as p: 
    squares = (
        p
        | beam.Create(nums)
        | beam.Map(lambda x: x**2)
    )
    
    ib.show(squares)

We can also visualize the pipeline graph - this is a visualization of the processing DAG (direct acyclic graph) using ``ib.show_graph``.

In [64]:
with beam.Pipeline(InteractiveRunner()) as p: 
    squares = (
        p
        | beam.Create(nums)
        | beam.Map(lambda x: x**2)
    )
    
    ib.show_graph(p)

Apache Beam and Cloud Dataflow are batteries included.  

It supports full interactivity with Python built-ins as well as third party packages.  Additionally, it comes with a very, very broad ecosystem of functions and transforms that work with batch and streaming data out-of-the-box.  These are some very simple examples - production pipelines support mixing, matching, joining, splitting for test / train, partitioning, multiple outputs, side inputs, reading and writing from BigQuery and an ecosystem of targets such as Amazon S3.   

In this example, we'll run a word count on King Lear by Shakespeare.

In [95]:
with beam.Pipeline(InteractiveRunner()) as p: 
    word_count = (
        p
        | 'Read from GCS' >> beam.io.ReadFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
        | 'Regex and flatten lines to words' >> beam.FlatMap(lambda line: re.findall(r'\w+', line.strip().lower()))
        | 'Combine and count unweighted' >> beam.combiners.Count.PerElement()
    )
    
    ib.show(word_count)
    ib.show_graph(p)

### III.c Processing in parallel with Beam and Cloud Dataflow

Getting back to our USA Names example, let's start small with a single file.  

In [107]:


with beam.Pipeline(InteractiveRunner()) as p: 
    
    columns = ('name', 'gender', 'count', 'year')
    
    usa_names_pipeline = (
        p
        | 'Read from memory ' >> beam.Create([usa_names_data])  # left over from last iteration of sampling
        | 'Flat map to lines ' >> beam.FlatMap(lambda big_str: big_str.split('\r\n')) # split on return-new line feed 
        | 'Process each line ' >> beam.Map(lambda line: line.split(','))
        | 'Map to keys ' >> beam.Map(lambda elem: {key: value for key, value in zip(columns, elem)})
    )
    
    ib.show(usa_names_pipeline)
    ib.show_graph(p)

This model is close to what we would use in production.  The only thing that's missing is I/O.  The Apache Beam Framework has an extensive list of IO providers: from the Create provider we've been using to read from memory all the way to interoperating with sinks and sources that exist on-premise and in other clouds.  Custom connectors can be built with plain python as necessary.  

Beam also supports the creation of custom and composite transforms and operations through it's framework.  Let's quickly explore I/O and custom connectors into a single, simple example.

```python3

dir(beam.io)  # A small sampling of popular connections

'ReadAllFromAvro',
 'ReadAllFromBigQuery',
 'ReadAllFromParquet',
 'ReadAllFromParquetBatched',
 'ReadAllFromText',
 'ReadFromAvro',
 'ReadFromBigQuery',
 'ReadFromBigQueryRequest',
 'ReadFromMongoDB',
 'ReadFromParquet',
 'ReadFromParquetBatched',
 'ReadFromPubSub',
 'ReadFromTFRecord',
 'ReadFromText',
 'ReadFromTextWithFilename',
 'ReadStringsFromPubSub',
```

In [119]:
file_pattern = os.path.join('gs://', usa_names_bucket.name, usa_names_folder, 'yob188')
file_paths = beam.io.gcp.gcsio.GcsIO().list_prefix(file_pattern)

# Let's look at the input before we move forward.
print('File prefix:', file_prefix)  
print(file_paths)

File prefix: gs://end-to-end-analysis-demo-i/usa_names/yob188
{'gs://end-to-end-analysis-demo-i/usa_names/yob1880.txt': 34933, 'gs://end-to-end-analysis-demo-i/usa_names/yob1881.txt': 33740, 'gs://end-to-end-analysis-demo-i/usa_names/yob1882.txt': 37194, 'gs://end-to-end-analysis-demo-i/usa_names/yob1883.txt': 36422, 'gs://end-to-end-analysis-demo-i/usa_names/yob1884.txt': 40155, 'gs://end-to-end-analysis-demo-i/usa_names/yob1885.txt': 40095, 'gs://end-to-end-analysis-demo-i/usa_names/yob1886.txt': 41782, 'gs://end-to-end-analysis-demo-i/usa_names/yob1887.txt': 41396, 'gs://end-to-end-analysis-demo-i/usa_names/yob1888.txt': 46319, 'gs://end-to-end-analysis-demo-i/usa_names/yob1889.txt': 45247}


In [129]:
# Composite Transform, user provided input.
class ListFilesFromGcs(beam.PTransform):
    
    def __init__(self, file_pattern):
        self.file_pattern = file_pattern
        
    def expand(self, pcoll):
        file_paths = beam.io.gcp.gcsio.GcsIO().list_prefix(self.file_pattern)
        return (
            pcoll
            | beam.Create(file_paths)
            | beam.Map(lambda element: element[0]) # dicts are broken down to items by Create.
        )
    

# Custom Function , side-input pattern
class SplitLinesAndConvertToRecords(beam.DoFn):
    
    def process(self, element, columns):
        values = element.split(',')
        record = {key: value for key, value in zip(columns, values)}
        yield record
        
    
with beam.Pipeline(InteractiveRunner()) as p: 
    
    columns = ('name', 'gender', 'count', 'year')
    
    usa_names_files = (
        p
        | 'Generate files list from GCS' >> ListFilesFromGcs(file_pattern)
    )
    
    usa_names_pipeline = (
        usa_names_files
        | 'Read all files from the list' >> beam.io.ReadAllFromText()
        | 'Split and convert rows to records' >> beam.ParDo(SplitLinesAndConvertToRecords(), columns=columns)
    )
    
    ib.show(usa_names_pipeline)
    ib.show_graph(p)

With the data in the right format, we're down to two missing operations before we configure this as a production workload.

* Writing output to BigQuery
* Parallelizing the pipeline

Let's address the last one first since Writing to BigQuery is really a matter of defining parameters and arguments.  Apache Beam and Dataflow take your data in chunks - under the hood these are called bundles.  The Dataflow runner handles this autonomously for the user, it's not something we need to manage outright (we can if we want to) -  so the most difficult part is taken care of serverless, distributed computation is taken care of!

In [None]:
write_to_bigquery_params = dict(
    table='fedciv-usgs-experimental:testing.ssa_baby_names_iii',  #The dataset must exist for this to work.
    schema='SCHEMA_AUTODETECT',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE', 
    custom_gcs_temp_location='gs://fedciv-usgs-experimental/end-to-end-analysis/staging'
)

with beam.Pipeline(InteractiveRunner()) as p: 
    
    columns = ('name', 'gender', 'count', 'year')
    
    usa_names_files = (
        p
        | 'Generate files list from GCS' >> ListFilesFromGcs(file_pattern)
    )
    
    usa_names_pipeline = (
        usa_names_files
        | 'Read all files from the list' >> beam.io.ReadAllFromText()
        | 'Split and convert rows to records' >> beam.ParDo(SplitLinesAndConvertToRecords(), columns=columns)
    )
    
    usa_names_pipeline \
    | beam.io.WriteToBigQuery(**write_to_bigquery_params)  # Dict unpacking
    
    ib.show(usa_names_pipeline)
    ib.show_graph(p)

## III.d Production 

We're ready to produce data.  To 'run' this pipeline, need to switch the `InteractiveRunner` with the `DataflowRunner` and configure it's options which are primarily location related.  Since we're using globally scoped imports, we'll run this code in a separate notebook / interpreter environment - the Dataflow Runner pickles (serializes) up code and environment and then runs it on GCP's Dataflow Runner so it always makes sense to start with a clean environment. 

The final pipeline looks like this:

In [134]:
%%writefile usa_names_pipeline.py
import os
import apache_beam as beam

# # Custom Transforms and Do Functions. 
# Composite Transform, user provided input.
class ListFilesFromGcs(beam.PTransform):
    
    def __init__(self, file_pattern):
        self.file_pattern = file_pattern
        
    def expand(self, pcoll):
        file_paths = beam.io.gcp.gcsio.GcsIO().list_prefix(self.file_pattern)
        return (
            pcoll
            | beam.Create(file_paths)
            | beam.Map(lambda element: element[0]) # dicts are broken down to items by Create.
        )
    

# Custom Function , side-input pattern
class SplitLinesAndConvertToRecords(beam.DoFn):
    
    def process(self, element, columns):
        values = element.split(',')
        record = {key: value for key, value in zip(columns, values)}
        yield record

        
# # Options, arguments, and parameters. 
write_to_bigquery_params = dict(
    table='fedciv-usgs-experimental:testing.ssa_baby_names_iii', 
    schema='SCHEMA_AUTODETECT',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE', 
    custom_gcs_temp_location='gs://fedciv-usgs-experimental/end-to-end-analysis/staging'
)

pipeline_options_args = {
    'region': 'us-east1',
    'runner': 'DataflowRunner',
    'project': 'fedciv-usgs-experimental',
    'temp_location': 'gs://fedciv-usgs-experimental/end-to-end-analysis/staging'
}

pipeline_options = beam.options.pipeline_options.PipelineOptions.from_dictionary(pipeline_options_args)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options.view_as(beam.options.pipeline_options.SetupOptions).save_main_session = True


with beam.Pipeline(options=pipeline_options) as p: 
    
    file_pattern = 'gs://end-to-end-analysis-demo-i/usa_names/yob'
    columns = ('name', 'gender', 'count', 'year')
    
    usa_names_files = (
        p
        | 'Generate files list from GCS' >> ListFilesFromGcs(file_pattern)
    )
    
    usa_names_pipeline = (
        usa_names_files
        | 'Read all files from the list' >> beam.io.ReadAllFromText()
        | 'Split and convert rows to records' >> beam.ParDo(SplitLinesAndConvertToRecords(), columns=columns)
    )
    
    usa_names_pipeline | 'Write to BigQuery' >> beam.io.WriteToBigQuery(**write_to_bigquery_params)  # Dict unpacking

Writing usa_names_pipeline.py


In [None]:
!python3 usa_names_pipeline.py

# IV. Serverless SQL ELT for EDA 

This part of the demo is conducted in the BigQuery Console.

# Imports for V

In [139]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

# V. Exploratory Data Analysis with Interactive Jupyter

Notebooks let us bring EDA to life with great ease.  

We were able to ELT and curate our dataset in BigQuery - but now we want to explore it back in a Notebook.  Cloud AI Platform Notebooks come with bigquery magic methods that let us easily interface with BigQuery - no need for extra libraries and coding, we can quickly query BigQuery and load data into a dataframe to continue our EDA. 

In [146]:
%%bigquery df
SELECT *
FROM `fedciv-usgs-experimental.testing.strongly_nonbinary_names_ii`

In [147]:
df = df.set_index('Year') # Indexy by year, in separate cell - not idempotent!

Notebooks come alive when interactive widgets are used. Users can visualize and control changes in the data. Learning becomes an immersive, plus fun, experience. Researchers can easily see how changing inputs to a model impacts the results.  We can use the Jupyter Interact Framework to accelerate our EDA activities.

Interact lets us define widgets such as sliders and selectors so we can dynamically work with our data in vivo.

In [145]:
sns.set_theme()
def plotter(name):
    name_df = df[df['Name'] == name]
    ax_bias = name_df['male_bias'].plot.line(figsize=(16, 12), label='Male Bias')
    ax_bias.set_ylabel('Male bias')
    ax_bias.set_ylim(-0.1, 1.1)
    ax_bias.set_xlim(1878, 2020)
    ax_bias.axhline(name_df['male_bias'].mean(), color='g', label='Mean Male Bias').set_linestyle('--')
    ax_pop = ax_bias.twinx()
    ax_pop = name_df['total_count'].plot.line(color='r', label='Total Count')
    return ax_pop.legend()

interact(plotter, name=df.Name.unique())

interactive(children=(Dropdown(description='name', options=('Addison', 'Adell', 'Adrain', 'Adrian', 'Adriel', …

<function __main__.plotter(name)>