# PySpark Example to access BigQuery and GCS

This notebook is supposed to work as a tutorial to submit Spark jobs to access BigQuery and GCS resources using JupyterLab running on a remote VM. In this example we will be reading data from a open source bigquery table `2016_01` available in `reddit-posts` database of `fh-bigquery` project. As an output we generate the total number of comments made in each subreddit and then print the top 20 subreddits with most comments as result.

## BigQuery Spark connector

Before running the below code, it is essential to have [BigQuery-Spark connector](https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example) propoerly positioned on the Dataproc master node. When Livy REST service is submitting a Spark job, it needs to provide this spark-bigquery connector to Spark master node, in order for Dataproc to access BigQuery table. 

The connector can be downloaded using below command, it is hosted on a public bucket by Google. This bucket needs to be saved to `livy_installation_path/repl*` directory.

In [None]:
!gsutil cp gs://spark-lib/bigquery/spark-bigquery-latest.jar .

## GCS Spark Connector

All Dataproc clusters come pre-installed with [GCS connectors](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage#clusters). This means that Spark cluster itself can reach GCS Spark connector jar files and hence do not require manual installation.

## Flow of Operation

When the below cell is executed:

1. [Sparkmagic](https://github.com/jupyter-incubator/sparkmagic) will pick up code from the JupyterLab cell and send it over to Livy REST server
2. [Apache Livy](https://livy.incubator.apache.org) REST service, which is listening on port 8998 of Dataproc Master node, will accept the spark job request and based on the configs made in livy.conf file, it will submit a Spark job to the master node.
3. Master node will execute the spark job, on the cluster according to config, and return result back to Livy REST service.
4. Livy REST service send the result back to JupyterLab VM via Sparkmagic and result of the operation gets displayed in the notebook.

In [6]:
# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# This will help catch some PySpark errors
# from py4j.protocol import Py4JJavaError

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit").getOrCreate()

# Create a two column schema consisting of a string and a long integer
fields = [StructField("subreddit", StringType(), True),
          StructField("count", LongType(), True)]
schema = StructType(fields)

# Create an empty DataFrame. We will continuously union our output with this
subreddit_counts = spark.createDataFrame([], schema)

# Keep track of all tables accessed via the job
tables_read = []

year = "2016"
month = "01"
# In the form of <project-id>.<dataset>.<table>
table = "fh-bigquery.reddit_posts.{0}_{1}".format(year, month)

# If the table doesn't exist we will simply continue and not
# log it into our "tables_read" list

table_df = spark.read.format('bigquery').option('table', table).load()
tables_read.append(table)

# We perform a group-by on subreddit, aggregating by the count and then
# unioning the output to our base dataframe
subreddit_counts = (
    table_df
    .groupBy("subreddit")
    .count()
    .union(subreddit_counts)
)
        
print("The following table will be accounted for in our analysis:")
print(table)

# From our base table, we perform a group-by, summing over the counts.
# We then rename the column and sort in descending order both for readability.
result = (
    subreddit_counts
    .groupBy("subreddit")
    .sum("count")
    .withColumnRenamed("sum(count)", "count")
    .sort("count", ascending=False)
)

# show() will collect the table into memory output the table to std out.
(
result.show()
)

output_bucket_name = "ankit-spark"
file_uri = "my_output_folder/bq_output.csv"
# Save the result in the form of a CSV file
# The command given below will save multiple CSV files because Spark will collect output from multiple processes 
# and save each of them in a separate CSV
# result.write.csv("gs://{0}/{1}".format(output_bucket_name, file_uri))

# To get a single CSV file as output, use the coalesce(1) before passing the dataframe to GCS
result.coalesce(1).write.csv("gs://{0}/{1}".format(output_bucket_name, file_uri))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The following table will be accounted for in our analysis:
fh-bigquery.reddit_posts.2016_01
+--------------------+------+
|           subreddit| count|
+--------------------+------+
|           AskReddit|187508|
|GlobalOffensiveTrade|174022|
|           Fireteams|102953|
|     leagueoflegends| 68740|
|               funny| 66569|
|                news| 58487|
|              videos| 57241|
|      Showerthoughts| 46319|
|                pics| 39492|
|     GlobalOffensive| 38365|
|              gaming| 37060|
|        dirtykikpals| 32270|
|                 aww| 30697|
|          betternews| 29856|
|           worldnews| 28527|
|        dirtypenpals| 27669|
|        pcmasterrace| 25965|
|               Music| 25188|
|                spam| 23770|
|              movies| 23683|
+--------------------+------+
only showing top 20 rows