## Migrating from Spark to BigQuery via Dataproc -- Part 2

* [Part 1](01_spark.ipynb): The original Spark code, now running on Dataproc (lift-and-shift).
* [Part 2](02_gcs.ipynb): Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)
* [Part 3](03_automate.ipynb): Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)
* [Part 4](04_bigquery.ipynb): Load CSV into BigQuery, use BigQuery. (modernize)
* [Part 5](05_functions.ipynb): Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)


### Catch up: get data

In [2]:
# Catch up cell. Run if you did not do previous notebooks of this sequence
! pip install kaggle
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! kaggle datasets download vijayuv/onlineretail
! unzip onlineretail.zip

Appending to spark_analysis.py


### Copy data to GCS

Instead of having the data in HDFS, keep the data in GCS. This will allow us to delete the cluster once we are done ("job-specific clusters")

In [15]:
BUCKET='cloud-training-demos-purchase'  # CHANGE
!gsutil cp OnlineRetail* gs://$BUCKET/

Copying file://OnlineRetail.csv [Content-Type=text/csv]...
- [1 files][ 43.5 MiB/ 43.5 MiB]                                                
Operation completed over 1 objects/43.5 MiB.                                     


In [16]:
!gsutil ls gs://$BUCKET/OnlineRetail*

gs://cloud-training-demos-purchase/OnlineRetail.csv


In [1]:
%%writefile spark_analysis.py
import matplotlib
matplotlib.use('agg')
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()
BUCKET = args.bucket

Overwriting spark_analysis.py


### Reading in data

Change any ```hdfs://``` URLs to ```gs://``` URLs. The code remains the same.

In [2]:
%%writefile -a spark_analysis.py
from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
header=spark.sparkContext.textFile("gs://{}/OnlineRetail.csv".format(BUCKET)).first()
data_file = "gs://{}/OnlineRetail.csv".format(BUCKET)
raw_rdd = sc.textFile(data_file).filter(lambda l :not str(l).startswith(header)).cache()
raw_rdd.take(5)

Appending to spark_analysis.py


In [3]:
%%writefile -a spark_analysis.py
csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    invoiceNo=r[0], 
    stockcode=r[1],
    description=r[2],
    quantity=r[3],
    invoicedate=r[4],
    unitprice=r[5],
    customerid=r[6],
    country=r[7]
    )
)
parsed_rdd.take(5)

Appending to spark_analysis.py


In [4]:
%%writefile -a spark_analysis.py
import pyspark.sql.functions as F

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
df_clean = df.withColumn("customerid", df.customerid.cast('int'))
df_clean = df.withColumn("quantity", df.customerid.cast('int'))
df_clean = df.withColumn("unitprice", df.customerid.cast('float'))
df_clean = df_clean.filter(F.col('customerid').isNotNull())
df_clean = df_clean.filter(F.col('quantity').isNotNull())
df_clean = df_clean.filter(F.col('unitprice').isNotNull())
df_clean.show()

df_user = df_clean.groupBy('customerid').count().orderBy('count', ascending=False)
df_user.show()
df_user.printSchema()

Appending to spark_analysis.py


In [5]:
%%writefile -a spark_analysis.py
df_clean.registerTempTable("purchases")
attack_stats = sqlContext.sql("""
                           SELECT 
                             customerid,
                             SUM(unitprice * quantity) as totalspent 
                           FROM purchases
                           GROUP BY customerid
                           ORDER BY totalspent DESC
                           """)
attack_stats.show()



Appending to spark_analysis.py


In [6]:
%%writefile -a spark_analysis.py
df_plot = attack_stats.limit(10)
ax = df_plot.toPandas().plot.bar(x='customerid', subplots=True, figsize=(10,25))

Appending to spark_analysis.py


### Write out report

Make sure to copy the output to GCS so that we can safely delete the cluster.

In [7]:
%%writefile -a spark_analysis.py
ax[0].get_figure().savefig('report.png');

Appending to spark_analysis.py


In [8]:
%%writefile -a spark_analysis.py
import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktodp/'):
    blob.delete()
bucket.blob('sparktodp/report.png').upload_from_filename('report.png')

Appending to spark_analysis.py


In [9]:
%%writefile -a spark_analysis.py
df_user.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktodp/cloud-training-demos-purchase".format(BUCKET))

Appending to spark_analysis.py


In [10]:
BUCKET_list = !gcloud info --format='value(config.project)'
BUCKET= 'cloud-training-demos-purchase'
print('Writing to {}'.format(BUCKET))
!/opt/conda/miniconda3/bin/python spark_analysis.py --bucket=$BUCKET

Writing to cloud-training-demos-purchase
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/05 17:46:45 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/12/05 17:46:46 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/12/05 17:46:46 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/12/05 17:46:46 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|invoiceNo|stockcode|         description|quantity|   invoicedate|unitprice|customerid|       country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|  17850.0|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|  17850.0|     

In [11]:
!gsutil ls gs://$BUCKET/sparktodp/**

gs://cloud-training-demos-purchase/sparktodp/cloud-training-demos-purchase/
gs://cloud-training-demos-purchase/sparktodp/cloud-training-demos-purchase/_SUCCESS
gs://cloud-training-demos-purchase/sparktodp/cloud-training-demos-purchase/part-00000-121725e5-2f03-4230-909b-6b3521b88b58-c000.csv
gs://cloud-training-demos-purchase/sparktodp/cloud-training-demos-purchase/part-00001-121725e5-2f03-4230-909b-6b3521b88b58-c000.csv
gs://cloud-training-demos-purchase/sparktodp/cloud-training-demos-purchase/part-00002-121725e5-2f03-4230-909b-6b3521b88b58-c000.csv
gs://cloud-training-demos-purchase/sparktodp/cloud-training-demos-purchase/part-00003-121725e5-2f03-4230-909b-6b3521b88b58-c000.csv
gs://cloud-training-demos-purchase/sparktodp/cloud-training-demos-purchase/part-00004-121725e5-2f03-4230-909b-6b3521b88b58-c000.csv
gs://cloud-training-demos-purchase/sparktodp/report.png


In [12]:
!gsutil cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py

Copying file://spark_analysis.py [Content-Type=text/x-python]...
/ [1 files][  2.3 KiB/  2.3 KiB]                                                
Operation completed over 1 objects/2.3 KiB.                                      


Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.