In [16]:
from pyspark import *
from pyspark.sql import SparkSession

> Apache Spark achieves high performance for both batch and streaming data, using a state-of-the-art DAG scheduler, a query optimizer, and a physical execution engine. 

# Spark Installation
1. Download and install spark here https://spark.apache.org/downloads.html (Can also be installed using ```homebrew install spark``` on a mac
2. ```pip install pyspark```

Note: These installations assume you have jupyter notebooks with a python3 kernel already installed from the class demo

# Spark Context
> Spark context sets up internal services and establishes a connection to a Spark execution environment. Once a SparkContext is created you can use it to create RDDs, accumulators and broadcast variables, access Spark services and run jobs (until SparkContext is stopped). A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application (don’t get confused with the other meaning of Master in Spark, though).

![](../images/sparkcontext-services.png)

Source: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-SparkContext.html

# spark DAG
> DAG stands for Directed Acyclic Graph. Every time you call .compute() (or in this demo's examples, .show()) the DAG is executed where certain processes are parallelized automatically and completed by separate workers. The spark manager is able to combine the individual task's results into your final computation. 

![](../images/spark_dag.png)


In [24]:
# sc = SparkContext()
spark = SparkSession(sc)

In [37]:
df = spark.read.csv("dataset.csv", inferSchema = True, header = True)

In [45]:
df.printSchema()

root
 |-- URL: string (nullable = true)
 |-- URL_LENGTH: integer (nullable = true)
 |-- NUMBER_SPECIAL_CHARACTERS: integer (nullable = true)
 |-- CHARSET: string (nullable = true)
 |-- SERVER: string (nullable = true)
 |-- CONTENT_LENGTH: string (nullable = true)
 |-- WHOIS_COUNTRY: string (nullable = true)
 |-- WHOIS_STATEPRO: string (nullable = true)
 |-- WHOIS_REGDATE: string (nullable = true)
 |-- WHOIS_UPDATED_DATE: string (nullable = true)
 |-- TCP_CONVERSATION_EXCHANGE: integer (nullable = true)
 |-- DIST_REMOTE_TCP_PORT: integer (nullable = true)
 |-- REMOTE_IPS: integer (nullable = true)
 |-- APP_BYTES: integer (nullable = true)
 |-- SOURCE_APP_PACKETS: integer (nullable = true)
 |-- REMOTE_APP_PACKETS: integer (nullable = true)
 |-- SOURCE_APP_BYTES: integer (nullable = true)
 |-- REMOTE_APP_BYTES: integer (nullable = true)
 |-- APP_PACKETS: integer (nullable = true)
 |-- DNS_QUERY_TIMES: string (nullable = true)
 |-- Type: integer (nullable = true)



In [39]:
df.show()

+-------+----------+-------------------------+----------+--------------------+--------------+-------------+-------------------+----------------+------------------+-------------------------+--------------------+----------+---------+------------------+------------------+----------------+----------------+-----------+---------------+----+
|    URL|URL_LENGTH|NUMBER_SPECIAL_CHARACTERS|   CHARSET|              SERVER|CONTENT_LENGTH|WHOIS_COUNTRY|     WHOIS_STATEPRO|   WHOIS_REGDATE|WHOIS_UPDATED_DATE|TCP_CONVERSATION_EXCHANGE|DIST_REMOTE_TCP_PORT|REMOTE_IPS|APP_BYTES|SOURCE_APP_PACKETS|REMOTE_APP_PACKETS|SOURCE_APP_BYTES|REMOTE_APP_BYTES|APP_PACKETS|DNS_QUERY_TIMES|Type|
+-------+----------+-------------------------+----------+--------------------+--------------+-------------+-------------------+----------------+------------------+-------------------------+--------------------+----------+---------+------------------+------------------+----------------+----------------+-----------+-----------

In [40]:
df.count()

1781

In [41]:
df.describe('CONTENT_LENGTH').show()

+-------+------------------+
|summary|    CONTENT_LENGTH|
+-------+------------------+
|  count|              1781|
|   mean|11726.927760577915|
| stddev|36391.809050611664|
|    min|                 0|
|    max|                NA|
+-------+------------------+



## Filtering by a condition

In [42]:
df.filter(df.WHOIS_COUNTRY=='US').show()

+-------+----------+-------------------------+----------+--------------------+--------------+-------------+--------------+----------------+------------------+-------------------------+--------------------+----------+---------+------------------+------------------+----------------+----------------+-----------+---------------+----+
|    URL|URL_LENGTH|NUMBER_SPECIAL_CHARACTERS|   CHARSET|              SERVER|CONTENT_LENGTH|WHOIS_COUNTRY|WHOIS_STATEPRO|   WHOIS_REGDATE|WHOIS_UPDATED_DATE|TCP_CONVERSATION_EXCHANGE|DIST_REMOTE_TCP_PORT|REMOTE_IPS|APP_BYTES|SOURCE_APP_PACKETS|REMOTE_APP_PACKETS|SOURCE_APP_BYTES|REMOTE_APP_BYTES|APP_PACKETS|DNS_QUERY_TIMES|Type|
+-------+----------+-------------------------+----------+--------------------+--------------+-------------+--------------+----------------+------------------+-------------------------+--------------------+----------+---------+------------------+------------------+----------------+----------------+-----------+---------------+----+
| B0

In [79]:
import time

# Time the process to see how long it takes
start = time.time()
m = df.agg({'DNS_QUERY_TIMES': 'mean'}).collect()
print(m)
end = time.time()
print(end - start)

[Row(avg(DNS_QUERY_TIMES)=2.263483146067416)]
0.06969118118286133


## Perform groupby and Count

In [43]:
country_df = df.groupby("WHOIS_COUNTRY")\
.count()\
.show()

+--------------+-----+
| WHOIS_COUNTRY|count|
+--------------+-----+
|            SC|    3|
|            UA|    2|
|            us|    3|
|            NL|    6|
|            BS|    4|
|[u'GB'; u'UK']|    5|
|          None|  306|
|            CN|   10|
|            AT|    4|
|            RU|    2|
|            CZ|    9|
|            KY|    3|
|            HK|    3|
|            AU|   35|
|            PK|    1|
|            CA|   84|
|            GB|   19|
|            BR|    2|
|            BY|    1|
|            DE|    3|
+--------------+-----+
only showing top 20 rows



## Registering as a Table

In [None]:
df.registerTempTable('demo_table')

In [51]:
spark.sql('select distinct(WHOIS_COUNTRY) from demo_table').show()

+--------------+
| WHOIS_COUNTRY|
+--------------+
|            SC|
|            UA|
|            us|
|            NL|
|            BS|
|[u'GB'; u'UK']|
|          None|
|            CN|
|            AT|
|            RU|
|            CZ|
|            KY|
|            HK|
|            AU|
|            PK|
|            CA|
|            GB|
|            BR|
|            BY|
|            DE|
+--------------+
only showing top 20 rows

