## Migrating from Spark to BigQuery via Dataproc -- Part 1

* [Part 1](01_spark.ipynb): The original Spark code, now running on Dataproc (lift-and-shift).
* [Part 2](02_gcs.ipynb): Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)
* [Part 3](03_automate.ipynb): Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)
* [Part 4](04_bigquery.ipynb): Load CSV into BigQuery, use BigQuery. (modernize)
* [Part 5](05_functions.ipynb): Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)


In [2]:
%%writefile spark_analysis.py
import matplotlib
matplotlib.use('agg')
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()
BUCKET = args.bucket

Writing spark_analysis.py


### Reading in data

The data are CSV files. In Spark, these can be read using textFile and splitting rows on commas.

In [3]:
%%writefile -a spark_analysis.py

from pyspark.sql import SparkSession, SQLContext, Row

gcs_bucket='qwiklabs-gcp-01-85289ef223f9'
spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://" + gcs_bucket + "//kddcup.data_10_percent.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

Appending to spark_analysis.py


In [4]:
%%writefile -a spark_analysis.py

csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)

Appending to spark_analysis.py


### Spark analysis

One way to analyze data in Spark is to call methods on a dataframe.

In [5]:
%%writefile -a spark_analysis.py

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()

Appending to spark_analysis.py


Another way is to use Spark SQL

In [6]:
%%writefile -a spark_analysis.py

df.registerTempTable("connections")
attack_stats = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_stats.show()

Appending to spark_analysis.py


In [7]:
%%writefile -a spark_analysis.py

ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

Appending to spark_analysis.py


In [8]:
%%writefile -a spark_analysis.py
ax[0].get_figure().savefig('report.png');

Appending to spark_analysis.py


In [9]:
%%writefile -a spark_analysis.py
import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktodp/'):
    blob.delete()
bucket.blob('sparktodp/report.png').upload_from_filename('report.png')

Appending to spark_analysis.py


In [10]:
%%writefile -a spark_analysis.py
connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))

Appending to spark_analysis.py


## Test automation

In [11]:
BUCKET_list = !gcloud info --format='value(config.project)'
BUCKET=BUCKET_list[0]
print('Writing to {}'.format(BUCKET))
!/opt/conda/miniconda3/bin/python spark_analysis.py --bucket=$BUCKET

Writing to qwiklabs-gcp-01-85289ef223f9
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/02 15:01:34 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/06/02 15:01:34 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/06/02 15:01:34 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/06/02 15:01:34 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/06/02 15:01:44 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #0,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExec

In [12]:
# List created elements :
!gsutil ls gs://$BUCKET/sparktodp/**

gs://qwiklabs-gcp-01-85289ef223f9/sparktodp/connections_by_protocol/
gs://qwiklabs-gcp-01-85289ef223f9/sparktodp/connections_by_protocol/_SUCCESS
gs://qwiklabs-gcp-01-85289ef223f9/sparktodp/connections_by_protocol/part-00000-0d96f7bf-203d-468b-9aa0-4179e6c9514a-c000.csv
gs://qwiklabs-gcp-01-85289ef223f9/sparktodp/connections_by_protocol/part-00001-0d96f7bf-203d-468b-9aa0-4179e6c9514a-c000.csv
gs://qwiklabs-gcp-01-85289ef223f9/sparktodp/connections_by_protocol/part-00002-0d96f7bf-203d-468b-9aa0-4179e6c9514a-c000.csv
gs://qwiklabs-gcp-01-85289ef223f9/sparktodp/report.png


In [13]:
# Save the created .py file to the bucket
!gsutil cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py

Copying file://spark_analysis.py [Content-Type=text/x-python]...
/ [1 files][  2.8 KiB/  2.8 KiB]                                                
Operation completed over 1 objects/2.8 KiB.                                      
