In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
project_id = os.environ['TEST_PROJECT_ID']
import logging
logging.basicConfig(level=logging.ERROR)

from dataproc.client import DataProc
from cloud_storage.client import CloudStorage
from io import StringIO
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
region = 'europe-west1'
zone_letter = 'd'
bucket_name = 'letter_statistics_example'


dp = DataProc(project_id, region, zone_letter)
cs = CloudStorage(project_id)

# Generate example data

In [None]:
# copy the lines that comes out of this and insert it in the cell below
print(os.path.join(os.environ['HOME'], 'gcp/dataproc/examples/letter_statistics/data_generator.py'))
print(project_id)

In [None]:
%run -i [YOUR_DATA_GENERATOR_PATH] --project_id=[YOUR_PROJECT_ID]

# Create a cluster

First upload an initialization script for the cluster. Afterwards create the cluster with the just uploaded initialization script.

In [None]:
source_file = os.path.join(os.environ['HOME'], 'gcp/dataproc/examples/letter_statistics/initialize_cluster.sh')
initialization_file = 'gs://{}/initialize_cluster.sh'.format(bucket_name)

cs.upload_blob_from_filename(source_file, initialization_file)

In [None]:
cluster_name = 'cluster-test'
master_machine_type = 'n1-standard-1'
nr_masters = 1
master_boot_disk_gb = 200
worker_machine_type = 'n1-standard-1'
nr_workers = 2
worker_boot_disk_gb = 100
metadata = {'MINICONDA_VARIANT': '3', 'MINICONDA_VERSION': '4.5.11'}

dp.create_cluster(cluster_name,
                  master_machine_type,
                  nr_masters,
                  master_boot_disk_gb,
                  worker_machine_type,
                  nr_workers,
                  worker_boot_disk_gb,
                  metadata,
                  initialization_file)

# Submit a pyspark job

Starting up a dataproc cluster can take a few minutes. So be patient with the next steps. If you trigger the job before the cluster has been fully initialized, then the cluster initialization may fail and your job may fail.

In [None]:
source_file = os.path.join(os.environ['HOME'], 'gcp/dataproc/examples/letter_statistics/calculate_letter_statistics.py')
main_python_file = 'gs://{}/calculate_letter_statistics.py'.format(bucket_name)

cs.upload_blob_from_filename(source_file, main_python_file)

In [None]:
output_path = 'gs://{}/letter_files/outputs/letter_statistics.csv'.format(bucket_name)

script_parameters = ['--project_id={}'.format(project_id),
                     '--input_file_glob=gs://{}/data/inputs/*'.format(bucket_name),
                     '--output_path={}'.format(output_path)]

job_id = dp.submit_pyspark_job(cluster_name, main_python_file, script_parameters)

In [None]:
dp.wait_for_job(job_id, request_interval=3)

# Inspect results

Since the job writes the statistics to cloud storage, let's read the output and visualize.

In [None]:
string_content = cs.get_blob_content(output_path)
df = pd.read_csv(StringIO(string_content), index_col=0)

In [None]:
fig, ax = plt.subplots(figsize=(20,6))

ax.set_xticks(range(len(df.index.values)))
ax.set_xticklabels(df.index)
df[['MEAN', 'VARIANCE', 'MIN', 'MAX']].plot(ax=ax);

# Cleanup: delete cluster and bucket

In [None]:
dp.delete_cluster(cluster_name)

In [None]:
cs.delete_bucket(bucket_name)