In [None]:
#!/usr/bin/python
"""GFS I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark

sc = pyspark.SparkContext('local[*]')

In [None]:
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bq_project = "savvy-aileron-127413"
fs_project = "owners-681171445480"
bucket = "spark_tmp"
account_email = "savvy-aileron-127413@appspot.gserviceaccount.com"
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
localKeyfile = '/mnt/key.p12'

# Output Parameters
output_dataset = 'tutorial'
output_table = 'wordcount_table'

conf = {
    # Input Parameters
    'fs.gs.impl': 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem',
    'fs.AbstractFileSystem.gs.impl': 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS',
    'fs.gs.project.id': fs_project,
    'mapred.bq.project.id': bq_project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
    'google.cloud.auth.service.account.enable': 'true',
    'google.cloud.auth.service.account.email': account_email,
    'google.cloud.auth.service.account.keyfile': localKeyfile,
}

In [None]:
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

In [None]:
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

(word_counts
 .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
 .saveAsTextFile(output_directory))

In [None]:
# Google cloud authentication
subprocess.check_call(
    '/home/jovyan/google-cloud-sdk/bin/gcloud auth activate-service-account {0} --key-file {1}'.format(account_email, localKeyfile).split())

# Shell out to bq CLI to perform BigQuery import.
subprocess.call(
    '/home/jovyan/google-cloud-sdk/bin/bq load --source_format NEWLINE_DELIMITED_JSON '
    '--project_id {project}'
    '--schema word:STRING,word_count:INTEGER '
    '{project}:{dataset}.{table} {files}'.format(
        project=bq_project, dataset=output_dataset, table=output_table, files=','.join(output_files)
    ).split())

In [None]:
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)