In [2]:
%%writefile spark_analysis.py
import matplotlib
matplotlib.use('agg')
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()
BUCKET = args.bucket

Overwriting spark_analysis.py


In [3]:
pwd

'/'

In [4]:
ls

[0m[01;36mbin[0m@       [01;34mhome[0m/                      [01;34mlost+found[0m/  [01;34mrun[0m/               [01;34musr[0m/
[01;34mboot[0m/      kddcup.data_10_percent.gz  [01;34mmedia[0m/       [01;36msbin[0m@              [01;34mvar[0m/
copyright  [01;36mlib[0m@                       [01;34mmnt[0m/         spark_analysis.py
[01;34mdev[0m/       [01;36mlib32[0m@                     [01;34mopt[0m/         [01;34msrv[0m/
[01;34metc[0m/       [01;36mlib64[0m@                     [01;34mproc[0m/        [01;34msys[0m/
[01;34mhadoop[0m/    [01;36mlibx32[0m@                    [01;34mroot[0m/        [30;42mtmp[0m/


### Reading in data

The data are CSV files. In Spark, these can be read using textFile and splitting rows on commas.

In [5]:
%%writefile -a spark_analysis.py
from pyspark.sql import SparkSession, SQLContext, Row

gcs_bucket = 'qwiklabs-gcp-03-cc1d562d196b'                                # change to GCS
spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://"+gcs_bucket+"//kddcup.data_10_percent.gz"   # change to GCS
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

Appending to spark_analysis.py


In [6]:
%%writefile -a spark_analysis.py
csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)

Appending to spark_analysis.py


### Spark analysis

One way to analyze data in Spark is to call methods on a dataframe.

In [7]:
%%writefile -a spark_analysis.py
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()

Appending to spark_analysis.py


Another way is to use Spark SQL

In [8]:
%%writefile -a spark_analysis.py
df.registerTempTable("connections")
attack_stats = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_stats.show()

Appending to spark_analysis.py


In [9]:
%%writefile -a spark_analysis.py
# %matplotlib inline
ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

Appending to spark_analysis.py


In [10]:
%%writefile -a spark_analysis.py
ax[0].get_figure().savefig('report.png');

Appending to spark_analysis.py


In [11]:
%%writefile -a spark_analysis.py
import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktodp/'):
    blob.delete()
bucket.blob('sparktodp/report.png').upload_from_filename('report.png')

Appending to spark_analysis.py


In [12]:
%%writefile -a spark_analysis.py
connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))

# essentially
# 1 file per row
# icmp,283602
# tcp,190065
# udp,20354

Appending to spark_analysis.py


# Test automation step

In [13]:
BUCKET_list = !gcloud info --format='value(config.project)'
BUCKET=BUCKET_list[0]
print('Writing to {}'.format(BUCKET))
!/opt/conda/miniconda3/bin/python spark_analysis.py --bucket=$BUCKET

Writing to qwiklabs-gcp-03-cc1d562d196b
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/28 07:23:51 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/08/28 07:23:51 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/08/28 07:23:51 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/08/28 07:23:51 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/08/28 07:23:59 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExec

In [14]:
!gsutil ls gs://$BUCKET/sparktodp/**

gs://qwiklabs-gcp-03-cc1d562d196b/sparktodp/connections_by_protocol/
gs://qwiklabs-gcp-03-cc1d562d196b/sparktodp/connections_by_protocol/_SUCCESS
gs://qwiklabs-gcp-03-cc1d562d196b/sparktodp/connections_by_protocol/part-00000-d9f22f17-edef-4ab7-8918-867362676d96-c000.csv
gs://qwiklabs-gcp-03-cc1d562d196b/sparktodp/connections_by_protocol/part-00001-d9f22f17-edef-4ab7-8918-867362676d96-c000.csv
gs://qwiklabs-gcp-03-cc1d562d196b/sparktodp/connections_by_protocol/part-00002-d9f22f17-edef-4ab7-8918-867362676d96-c000.csv
gs://qwiklabs-gcp-03-cc1d562d196b/sparktodp/report.png


In [15]:
!gsutil cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py

Copying file://spark_analysis.py [Content-Type=text/x-python]...
/ [1 files][  2.9 KiB/  2.9 KiB]                                                
Operation completed over 1 objects/2.9 KiB.                                      


Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.