# Using Google Cloud Storage, Dataproc and BigQuery from Jupyter Notebooks


# Install Packages

- Create a requirements text file and write to disk locally
- This can be embedded in the container image so not to be required in each notebook

In [20]:
%%writefile requirements.txt
google-cloud-storage
google-cloud-bigquery
google-cloud==0.32.0
google-api-python-client
google-auth==1.4.1
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.2.0
pytz==2018.3

Overwriting requirements.txt


In [21]:
# Install packages
!pip install -qq --upgrade -r requirements.txt

# Import Libraries
- The notebook relies on a custum library I've built for this demo, the custom library can also be embedded in the docker instance

In [22]:
import os, pyspark, csv, custom_gcp as gcp
sc = pyspark.SparkContext.getOrCreate()

from google.cloud import storage
from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig
from google.cloud.bigquery import SchemaField
import googleapiclient.discovery

# Download and Create Sample Files

- Create a sample text file and write to disk locally

In [23]:
%%writefile sample_text_file.txt
Hello world! dog elephant panther

Overwriting sample_text_file.txt


- Create a sample pyspark file and write to disk locally.
- The script expects a text file in GCS
- gs://spark-on-kubs-bucket/sample_text_file.txt doesn't exist yet, we'll upload it later

In [24]:
%%writefile pyspark_sort_gcs.py
import pyspark

sc = pyspark.SparkContext.getOrCreate()
txt = sc.textFile('gs://data-science-on-kubs-bucket/sample_text_file.txt')
txt = txt.first().split()
words = sorted(txt)
print(words)

Overwriting pyspark_sort_gcs.py


- Create a sample pyspark file and write to disk locally
- The script declares its own string

In [25]:
%%writefile pyspark_sort.py
import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

Overwriting pyspark_sort.py


In [26]:
!ls -lrt

total 56
drwsrwsr-x 2 jovyan users  4096 Jul 20 19:33  work
-rw-r--r-- 1 jovyan users 16160 Oct  7 06:08  custom_gcp.py
drwxr-sr-x 2 jovyan users  4096 Oct  7 06:10  __pycache__
-rw-r--r-- 1 jovyan users   176 Oct  7 06:18  requirements.txt
-rw-r--r-- 1 jovyan users 12535 Oct  7 06:18 'GCS DP BQ.ipynb'
-rw-r--r-- 1 jovyan users    33 Oct  7 06:18  sample_text_file.txt
-rw-r--r-- 1 jovyan users   189 Oct  7 06:18  pyspark_sort_gcs.py
-rw-r--r-- 1 jovyan users   159 Oct  7 06:18  pyspark_sort.py


# Global Variables
- Note that here we're using the key.json that was embedded in the container at creation time. We can choose to use the user's account instead if we're creating the Jupyter instance programmatically. For the moment, we'll use a more generic service account.

In [27]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/var/secrets/google/key.json"
GCP_PROJECT = 'data-science-on-kubs'
GCP_REGION = 'australia-southeast1'
GCP_ZONE = 'australia-southeast1-a'
GCS_BUCKET = 'data-science-on-kubs-bucket'
GCS_OBJECT = 'sample_text_file.txt'
DATAPROC_CLUSTER = 'dataproc-temp-cluster'
BIGQUERY_DS_ID = 'bq_test_dataset'
BIGQUERY_TABLENAME = 'bq_test_table'

# Google Cloud Storage

In [28]:
# WARNING: THIS DELETES ALL BUCKETS AND OBJECTS IN A PROJECT
# USE ONLY ON A TEST, NEW PROJECT
gcp.delete_all_buckets_and_blobs(GCP_PROJECT)

# Shows how to create, list, delete and upload objects into buckets, as well as deleting objects
gcp.create_bucket(GCS_BUCKET, GCP_PROJECT)
gcp.list_buckets(GCP_PROJECT)
gcp.upload_blob(GCS_BUCKET, GCS_OBJECT, GCS_OBJECT, GCP_PROJECT)
gcp.list_blobs(GCS_BUCKET, GCP_PROJECT)
gcp.download_blob(GCS_BUCKET, GCS_OBJECT, GCS_OBJECT, GCP_PROJECT)
gcp.delete_blob(GCS_BUCKET, GCS_OBJECT, GCP_PROJECT)
gcp.delete_bucket(GCS_BUCKET, GCP_PROJECT)
gcp.list_buckets(GCP_PROJECT)

Bucket <Bucket: data-science-on-kubs-bucket> deleted
Bucket data-science-on-kubs-bucket created
Bucket data-science-on-kubs-bucket found
File sample_text_file.txt uploaded to sample_text_file.txt
File sample_text_file.txt found
Blob sample_text_file.txt downloaded to sample_text_file.txt
Blob sample_text_file.txt deleted
Bucket data-science-on-kubs-bucket deleted


In [29]:
# Create bucket and load sample_text_file.txt to it
gcp.create_bucket(GCS_BUCKET, GCP_PROJECT)
gcp.list_buckets(GCP_PROJECT)
gcp.upload_blob(GCS_BUCKET, 'sample_text_file.txt', 'sample_text_file.txt', GCP_PROJECT)
gcp.list_blobs(GCS_BUCKET, GCP_PROJECT)
gcp.delete_all_buckets_and_blobs(GCP_PROJECT)

Bucket data-science-on-kubs-bucket created
Bucket data-science-on-kubs-bucket found
File sample_text_file.txt uploaded to sample_text_file.txt
File sample_text_file.txt found
Blob <Blob: data-science-on-kubs-bucket, sample_text_file.txt> deleted
Bucket <Bucket: data-science-on-kubs-bucket> deleted


# Dataproc

In [30]:
# PySpark can be run locally
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

['Hello,', 'dog', 'elephant', 'panther', 'world!']


In [31]:
gcp.list_clusters_with_details(project=GCP_PROJECT, region=GCP_REGION)

There are no Dataproc Clusters in this Project and Region


In [32]:
# WARNING: THIS DELETES ALL BUCKETS AND OBJECTS IN A PROJECT
# USE ONLY ON A TEST, NEW PROJECT
gcp.delete_all_buckets_and_blobs(GCP_PROJECT)
gcp.create_bucket(GCS_BUCKET, GCP_PROJECT)

Bucket data-science-on-kubs-bucket created


In [33]:
# USING LOCAL PYSPARK (submit_pyspark_job_to_cluster uploads to GCS first), WITH EMBEDDED STRING
# Submit Job to Dataproc Cluster
# Note: More parameters for programatic job run can be configured,
# check here: https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig
output = gcp.submit_pyspark_job_to_cluster(
    project_id=GCP_PROJECT,
    zone=GCP_ZONE, 
    cluster_name=DATAPROC_CLUSTER, 
    bucket_name=GCS_BUCKET, 
    pyspark_file='pyspark_sort.py', 
    create_new_cluster=True,
    master_type='n1-standard-1', 
    worker_type='n1-standard-1',
    sec_worker_type='n1-standard-1',
    no_masters=1,
    no_workers=2, 
    no_sec_workers=1, 
    sec_worker_preemptible=True, 
    dataproc_version='1.2'
    )

Creating cluster...
Waiting for cluster creation...
Cluster created.
dataproc-temp-cluster - RUNNING
Submitted job ID 7847c497-e79e-456e-96cf-59ee5c64770f
Waiting for job to finish...
Job finished.
Tearing down cluster


In [34]:
# WARNING: THIS DELETES ALL BUCKETS AND OBJECTS IN A PROJECT
# USE ONLY ON A TEST, NEW PROJECT
gcp.delete_all_buckets_and_blobs(GCP_PROJECT)
gcp.create_bucket(GCS_BUCKET, GCP_PROJECT)
gcp.list_buckets(GCP_PROJECT)
gcp.upload_blob(GCS_BUCKET, 'sample_text_file.txt', 'sample_text_file.txt', GCP_PROJECT)
gcp.list_blobs(GCS_BUCKET, GCP_PROJECT)

Blob <Blob: data-science-on-kubs-bucket, pyspark_sort.py> deleted
Bucket <Bucket: data-science-on-kubs-bucket> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/62026cd0-822d-45e4-b272-f2c88b2b0571/cluster.properties> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/62026cd0-822d-45e4-b272-f2c88b2b0571/dataproc-temp-cluster-m/dataproc-startup-script_output> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/62026cd0-822d-45e4-b272-f2c88b2b0571/dataproc-temp-cluster-sw-z28c/dataproc-startup-script_output> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/62026cd0-822d-45e4-b272-f2c88b2b0571/dataproc-temp-cluster-w-0/dataproc-startup-script_output> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dat

#### *WARNING*: Your previous Dataproc cluster is terminating, and if you attempt to create a new one before the previous cluster has been torn down, you might reach the default resource limit for this project and have this fail. If so, simply attempt again in a few minutes or increase your limit.

In [36]:
# USING LOCAL PYSPARK (submit_pyspark_job_to_cluster uploads to GCS first), WITH GCS STRING
# Submit Job to Dataproc Cluster
# Note: More parameters for programatic job run can be configured,
# check here: https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig
output = gcp.submit_pyspark_job_to_cluster(
    project_id=GCP_PROJECT,
    zone=GCP_ZONE, 
    cluster_name=DATAPROC_CLUSTER, 
    bucket_name=GCS_BUCKET, 
    pyspark_file='pyspark_sort_gcs.py', 
    create_new_cluster=True,
    master_type='n1-standard-1', 
    worker_type='n1-standard-1',
    sec_worker_type='n1-standard-1',
    no_masters=1,
    no_workers=2, 
    no_sec_workers=1, 
    sec_worker_preemptible=True, 
    dataproc_version='1.2'
    )

Creating cluster...
Waiting for cluster creation...
Cluster created.
dataproc-temp-cluster - RUNNING
Submitted job ID 4e87cfcf-b00c-4c8b-8757-c45206b41201
Waiting for job to finish...
Job finished.
Received job output b"18/10/07 06:27:57 INFO org.spark_project.jetty.util.log: Logging initialized @7579ms\n18/10/07 06:27:57 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT\n18/10/07 06:27:57 INFO org.spark_project.jetty.server.Server: Started @7805ms\n18/10/07 06:27:57 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@23aead7c{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}\n18/10/07 06:27:58 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.10-hadoop2\n18/10/07 06:28:00 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at dataproc-temp-cluster-m/10.152.0.5:8032\n18/10/07 06:28:05 WARN org.apache.hadoop.hdfs.DataStreamer: Caught exception\njava.lang.InterruptedException\n\tat java.lang.Object.wait(Nati

# BigQuery

In [37]:
# List, Create, and Delete Datasets
gcp.list_datasets(GCP_PROJECT)
gcp.delete_dataset(GCP_PROJECT, BIGQUERY_DS_ID)
gcp.create_dataset(GCP_PROJECT, BIGQUERY_DS_ID)
gcp.list_datasets(GCP_PROJECT)

data-science-on-kubs project does not contain any datasets
Dataset bq_test_dataset does not exist
Dataset bq_test_dataset created
Datasets in project data-science-on-kubs:
	bq_test_dataset


In [38]:
# List, Create, and Delete Tables, within a Dataset
gcp.list_tables(GCP_PROJECT, BIGQUERY_DS_ID)
gcp.create_table(GCP_PROJECT, BIGQUERY_DS_ID, BIGQUERY_TABLENAME)
gcp.delete_table(GCP_PROJECT, BIGQUERY_DS_ID, BIGQUERY_TABLENAME)
gcp.list_tables(GCP_PROJECT, BIGQUERY_DS_ID)
gcp.create_table(GCP_PROJECT, BIGQUERY_DS_ID, BIGQUERY_TABLENAME)
gcp.get_table(GCP_PROJECT, BIGQUERY_DS_ID, BIGQUERY_TABLENAME)

bq_test_dataset dataset does not contain any tables
Table bq_test_table created
Table bq_test_dataset:bq_test_table deleted
bq_test_dataset dataset does not contain any tables
Table bq_test_table created
[SchemaField('full_name', 'STRING', 'REQUIRED', None, ()), SchemaField('age', 'INTEGER', 'REQUIRED', None, ())]
None
0


In [39]:
# Insert rows into a Table
rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]  
gcp.insert_in_table(GCP_PROJECT, BIGQUERY_TABLENAME, BIGQUERY_DS_ID, rows_to_insert)

In [40]:
# Query a Table via SQL
table = '`' + GCP_PROJECT +'.'+ BIGQUERY_DS_ID +'.'+ BIGQUERY_TABLENAME + '`'
query = 'SELECT * FROM ' + table
gcp.query_table(GCP_PROJECT, query)

Row(('Phred Phlyntstone', 32), {'full_name': 0, 'age': 1})
Row(('Wylma Phlyntstone', 29), {'full_name': 0, 'age': 1})


In [41]:
# Extract a Table to Google Cloud Storage
gcp.delete_all_buckets_and_blobs(GCP_PROJECT)
gcp.create_bucket(GCS_BUCKET, GCP_PROJECT)
gcp.list_buckets(GCP_PROJECT)
destination_uri = 'gs://{}/{}'.format(GCS_BUCKET, 'output.csv')
gcp.extract_table_to_gcs(GCP_PROJECT, BIGQUERY_DS_ID, BIGQUERY_TABLENAME, destination_uri)
gcp.list_blobs(GCS_BUCKET, GCP_PROJECT)
gcp.delete_all_buckets_and_blobs(GCP_PROJECT)
gcp.delete_table(GCP_PROJECT, BIGQUERY_DS_ID, BIGQUERY_TABLENAME)

Blob <Blob: data-science-on-kubs-bucket, pyspark_sort_gcs.py> deleted
Blob <Blob: data-science-on-kubs-bucket, sample_text_file.txt> deleted
Bucket <Bucket: data-science-on-kubs-bucket> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/de2893ef-51f4-4e97-a451-ce0ca364e104/cluster.properties> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/de2893ef-51f4-4e97-a451-ce0ca364e104/dataproc-temp-cluster-m/dataproc-startup-script_output> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/de2893ef-51f4-4e97-a451-ce0ca364e104/dataproc-temp-cluster-sw-76mj/dataproc-startup-script_output> deleted
Blob <Blob: dataproc-f56270e1-9774-45c5-98b6-8c9c32b30087-au-southeast1, google-cloud-dataproc-metainfo/de2893ef-51f4-4e97-a451-ce0ca364e104/dataproc-temp-cluster-w-0/dataproc-startup-script_output> deleted
Blob <Blob: da

In [42]:
# Load parque file from GCS to BigQuery
uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet'
gcp.load_gcs_parquet_to_table(GCP_PROJECT, BIGQUERY_TABLENAME, BIGQUERY_DS_ID, uri)
table = '`' + GCP_PROJECT +'.'+ BIGQUERY_DS_ID +'.'+ BIGQUERY_TABLENAME + '`'
query = 'SELECT * FROM ' + table
gcp.query_table(GCP_PROJECT, query)

Starting job 3567c0c9-c4cf-410f-a5d3-5c6c4b3f4309
Job finished
Loaded 50 rows
Row(('Alabama', 'AL'), {'name': 0, 'post_abbr': 1})
Row(('Alaska', 'AK'), {'name': 0, 'post_abbr': 1})
Row(('Arizona', 'AZ'), {'name': 0, 'post_abbr': 1})
Row(('Arkansas', 'AR'), {'name': 0, 'post_abbr': 1})
Row(('California', 'CA'), {'name': 0, 'post_abbr': 1})
Row(('Colorado', 'CO'), {'name': 0, 'post_abbr': 1})
Row(('Connecticut', 'CT'), {'name': 0, 'post_abbr': 1})
Row(('Delaware', 'DE'), {'name': 0, 'post_abbr': 1})
Row(('Florida', 'FL'), {'name': 0, 'post_abbr': 1})
Row(('Georgia', 'GA'), {'name': 0, 'post_abbr': 1})
Row(('Hawaii', 'HI'), {'name': 0, 'post_abbr': 1})
Row(('Idaho', 'ID'), {'name': 0, 'post_abbr': 1})
Row(('Illinois', 'IL'), {'name': 0, 'post_abbr': 1})
Row(('Indiana', 'IN'), {'name': 0, 'post_abbr': 1})
Row(('Iowa', 'IA'), {'name': 0, 'post_abbr': 1})
Row(('Kansas', 'KS'), {'name': 0, 'post_abbr': 1})
Row(('Kentucky', 'KY'), {'name': 0, 'post_abbr': 1})
Row(('Louisiana', 'LA'), {'name': 

In [43]:
gcp.delete_all_buckets_and_blobs(GCP_PROJECT)
gcp.delete_dataset(GCP_PROJECT, BIGQUERY_DS_ID)

Dataset bq_test_dataset deleted
