# `spark.sql` & `Dataframes`
---

The Spark RDD API has basic functionality for performing data analysis tasks. 

spark.sql has interfaces for capturing information about the structure of the data and the computations being performed, and it uses this information for optimizing tasks.

Ways to interact with spark.sql include 

- SQL (basic SQL and/or HiveQL)
- DataFrames API,
- Datasets API

The execution engine used for a particular task is the same irrespective of the API. This gives developers the freedom to choose the API that most naturally expresses a given computation.

---
## `SQL`

- to execute SQL queries written using either a basic SQL syntax or HiveQL.
- to read data from Hive or other databases over ODBC/JDBC
- query results are returned as DataFrames

---
## `DataFrames`

- a distributed collection of data with named columns. 
- conceptually equivalent to RDBMS tables or data-frames in R/Python, (but with richer optimizations under the hood.)
- can be constructed from: 
    - structured text/csv files, 
    - Hive tables, 
    - external databases, or  
    - existing RDDs
- available in Scala, Java, Python, and R.

---
## `Datasets`

- new experimental interface added in Spark 1.6 
- tries to provide the benefits of RDDs (strong typing, lambda functions) with the benefits of Spark SQL’s optimized execution engine. 
- can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.)
- available in Scala and Java only


---
## `SQLContext` and `HiveContext`

This is the entry point into all relational functionality in Spark. 

A basic `SQLContext`, is created using a `SparkContext`

```
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
```

The `HiveContext` provides additional features, including the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables.

In [None]:
from pandas import DataFrame
import numpy as np
sc

In [None]:
from pyspark.sql import SQLContext
sqlctx = SQLContext(sc)

type(sqlctx)

In [None]:
# explore methods
spark

In [None]:
pandas_df = DataFrame(data=np.random.randint(1, 100, 20).reshape(4, 5),
                     columns=list('ABCDE'))
pandas_df

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
spark.createDataFrame(pandas_df)

In [None]:
spark_df = spark.createDataFrame(pandas_df)

In [None]:
type(spark_df)

In [None]:
spark_df.show()

# 3. DataFrame

Like an RDD, a DataFrame is an immutable distributed collection of data. 

Unlike an RDD, data is organized into named columns, like a table in a relational database. 

Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific (pandas-like) language API to manipulate your distributed data; and makes Spark accessible to a wider audience, beyond specialized data engineers.

Below is an Apache Spark code snippet using SQL and DataFrames to query and join different data sources.

```
# Read JSON file and register temp view
context.jsonFile("s3n://...").createOrReplaceTempView("json")
 
# Execute SQL query 
results = context.sql("""SELECT * FROM people JOIN json ...""")
```

---
## A bit of history
The developers of Apache Spark aimed to provide a simple API for distributed data processing in general-purpose programming languages (Java, Python, Scala, R). Earlier versions of Spark enabled distributed data processing through functional transformations on distributed collections of data (RDDs). This was an incredibly powerful API: tasks that used to take thousands of lines of code to express could be reduced to dozens.

As Spark continued to grow, we want to enable wider audiences beyond “Big Data” engineers to leverage the power of distributed processing. The new DataFrames API was created with this goal in mind.  This API is inspired by data frames in R and Python (Pandas), but designed from the ground-up to support modern big data and data science applications. As an extension to the existing RDD API, DataFrames feature:

- Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster
- Support for a wide array of data formats and storage systems
- State-of-the-art optimization and code generation through the Spark SQL Catalyst optimizer
- Seamless integration with all big data tooling and infrastructure via Spark
- APIs for Python, Java, Scala, and R

For new users familiar with data frames in other programming languages, this API makes them feel at home. For existing Spark users, this extended API will make Spark easier to program, and at the same time improve performance through intelligent optimizations and code-generation.

The introduction of Dataframe is actually kind of a **big deal**, because when RDDs were the only option to load data, it was obvious that you needed to 

- parse your *maybe* un-structured data using RDDs, 
- transform them using case-classes or tuples and then 
- do the special work that you actually needed. 

Spark SQL is not a new project and one could, of course, load structured-data (like Parquet files) directly from a SQLContext before Spark 1.3 – but the advantages were limited to running SQL queries or exposing a JDBC-compatible server for other BI tools.

---
## Advantages of Dataframes

### 1) Dataframes are a higher level of abstraction than RDDs

If you’re familiar with Pandas syntax, you will feel at home using Spark’s Dataframe and even if you’re not, you’ll learn and – I’d even add – learn to love it. Why ? Because it’s a higher level of programming than the RDD, you can do more, faster

<img src="https://ogirardot.files.wordpress.com/2015/05/rdd-vs-dataframe.png?w=640&h=360">


### 2) Spark SQL/Catalyst is intelligent 

When you’re using Dataframe, you’re not defining directly a DAG (Directed Acyclic Graph) anymore, you’re actually creating an AST (Abstract Syntax Tree) that the Catalyst engine will parse, check and improve using both Rules-Based Optimisation and Cost-Based Optimisation.

### 3) Python & Scala are now even in terms of performance

Using the Dataframe API, you’re using a DSL that leverages Spark’s Scala bytecode – when using RDDs, Python lambdas will run in a Python VM, Java/Scala lambdas will run in the JVM, this is great because inside RDDs you can use your usual Python libraries (Numpy, Scipy, etc…) and not some Jython code, but it comes at a performance cost.

This is still true if you want to use Dataframe’s User Defined Functions, you can write them in Java/Scala or Python and this will impact your computation performance – but if you manage to stay in a pure Dataframe computation – then nothing will get between you and the best computation performance you can possibly get.

### 4)  Dataframes are the future for Spark

Spark ML is already a pretty obvious example of this, the Pipeline API is designed entirely around Dataframes as their sole data structure for parallel computations, model training and predictions.

Here's what the future of Spark looks like:

<img src="https://ogirardot.files.wordpress.com/2015/05/future-of-spark.png?w=640&h=358">


---
## What Are DataFrames?
In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

How Can One Use DataFrames?
Once built, DataFrames provide a domain-specific language for distributed data manipulation.  Here is an example of using DataFrames to manipulate the demographic data of a large population of users:

```
# Create a new DataFrame that contains “young users” only
young = users.filter(users.age < 21)
 
# Alternatively, using Pandas-like syntax
young = users[users.age < 21]
 
# Increment everybody’s age by 1
young.select(young.name, young.age + 1)
 
# Count the number of young users by gender
young.groupBy("gender").count()
 
# Join young users with another DataFrame called logs
young.join(logs, young.userId == logs.userId, "left_outer")
```

---
## Using SQL
You can also incorporate SQL while working with DataFrames, using Spark SQL. This example counts the number of users in the young DataFrame.

```
young.registerTempTable("young")
context.sql("SELECT count(*) FROM young")
```
---
## Pandas and Spark
In Python, you can also convert freely between Pandas DataFrame and Spark DataFrame:

```
# Convert Spark DataFrame to Pandas
pandas_df = young.toPandas()
 
# Create a Spark DataFrame from Pandas
spark_df = context.createDataFrame(pandas_df)
```

Similar to RDDs, DataFrames are evaluated lazily. That is to say, computation only happens when an action (e.g. display result, save output) is required. This allows their executions to be optimized, by applying techniques such as predicate push-downs and bytecode generation. All DataFrame operations are also automatically parallelized and distributed on clusters.

---
## Supported Data Formats and Sources
Modern applications often need to collect and analyze data from a variety of sources. Out of the box, DataFrame supports reading data from the most popular formats, including JSON files, Parquet files, Hive tables. It can read from local file systems, distributed file systems (HDFS), cloud storage (S3), and external relational database systems via JDBC. In addition, through Spark SQL’s external data sources API, DataFrames can be extended to support any third-party data formats or sources. Existing third-party extensions already include Avro, CSV, ElasticSearch, and Cassandra.

<img src="https://databricks.com/wp-content/uploads/2015/02/Introducing-DataFrames-in-Spark-for-Large-Scale-Data-Science1.png">


---
## Combine data from Disparate Sources
DataFrames’ support for data sources enables applications to easily combine data from disparate sources (known as federated query processing in database systems). For example, the following code snippet joins a site’s textual traffic log stored in S3 with a PostgreSQL database to count the number of times each user has visited the site.

```
users = context.jdbc("jdbc:postgresql:production", "users")
logs = context.load("/path/to/traffic.log")
logs.join(users, logs.userId == users.userId, "left_outer") \
  .groupBy("userId").agg({"*": "count"})
```

---
# In-Depth: Sparkling Pandas

you can finally port pretty much any relevant piece of Pandas’ DataFrame computation to Apache Spark parallel computation framework using Spark SQL’s DataFrame. Let's see how we can take a few concepts from Pandas DataFrame and see how we can translate this to PySpark’s DataFrame.

> Disclaimer:  a few operations that you can do in Pandas don’t translate to Spark well. Please remember that DataFrames in Spark are like RDD in the sense that they’re an immutable data structure. 

Therefore things like:

```
# Pandas code to create a new column "three"
df['three'] = df['one'] * df['two']
```

can’t exist, just because this kind of affectation goes against the principles of Spark. Another example would be trying to access by index a single element within a DataFrame. Don’t forget that you’re using a distributed data structure, not an in-memory random-access data structure.

To be clear, this doesn’t mean that you can’t do the same kind of thing (i.e. create a new column) using Spark, it means that you have to think immutable/distributed and re-write parts of your code, mostly the parts that are not purely thought of as transformations on a stream of data.

## 0. Creating Toy DataFrames in Pandas and Spark

In [None]:
# Pandas => pdf
import pandas as pd
pdf = pd.DataFrame(data=[[1, 2, 3], [4, 5, 6]], 
                   columns=['A', 'B', 'C'])
pdf

In [None]:
# SPARK SQL => df
df = spark.createDataFrame(data=[(1, 4), 
                                 (2, 5), 
                                 (3, 6)], 
                            schema=["A", "B"])
df

In [None]:
sparkcreateDataFrame()

In [None]:
df.show()

## 1. Subsetting Columns

This part is not that much different in Pandas and Spark, but you have to take into account the immutable character of your DataFrame. First let’s create two DataFrames one in Pandas *pdf* and one in Spark *df*

In Spark SQL or Pandas you use the same syntax to refer to a column. 

> The only difference is that in Pandas, it is a mutable data structure that you can change – not in Spark.

In [None]:
# Pandas
print pdf.A
print pdf['A']

In [None]:
# Spark
df.A

In [None]:
df['A']

In [None]:
type(df['A'])

In [None]:
# pyspark.sql.Row
# pyspark.sql.Column

---
## 2. Column adding

In [None]:
# Pandas
pdf['D'] = 0
pdf

In Spark SQL you'll use the `withColumn` or the `select` method, but you need to create a "Column"

In [None]:
# Spark
from pyspark.sql import functions as F

In [None]:
df.show()

In [None]:
df.withColumn('C', F.lit(0))

In [None]:
df.withColumn('C', F.lit(0)).show()

NOTE

Most of the time in Spark SQL you can use Strings to reference columns but there are cases where you’ll want to use the Column objects rather than Strings :

- When you need to manipulate columns using expressions like Adding two columns to each other, Twice the value of this column or even Is the column value larger than 0 ?, you won’t be able to use simple strings and will need the Column reference.
- Finally if you need renaming, cast or any other complex feature, you’ll need the Column reference too.


When you’re selecting columns, to create another projected DataFrame, you can also use expressions:

## Creating new columns using `withColumn()` and `select`

In [None]:
df.withColumn('C', df.A * 2).show()

In [None]:
df.withColumn('C', df.B > 4).show()

In [None]:
df.select(df.B > 0).show()

The column name will actually be computed according to the expression you defined. 


If you want to rename this, you’ll need to use the alias method on Column:

In [None]:
df.select((df.B > 4).alias("is_grt_4")).show()

All of the expressions that we’re building here can be used for 

- Filtering, 
- Adding a new column or 
- even inside Aggregations

## 3. Split-Apply-Combine

What can be confusing at first in using aggregations is that the minute you write `groupBy` you're not using a DataFrame object, you’re actually using a `GroupedData` object and you need to precise your aggregations.

In [None]:
pdf_2 = DataFrame()
pdf_2['A'] = list('XY' * 5)
pdf_2['B'] = np.random.randn(10).round(2)

In [None]:
pdf_2

In [None]:
df = spark.createDataFrame(pdf_2)

In [None]:
g_df = df.groupBy("A")

In [None]:
g_df.avg("B").show()

In [None]:
df.groupBy("A").avg("B")

In [None]:
df.groupBy("A").avg("B").show()

In [None]:
df.groupBy('A').max().show()

If you need only one aggregation, you can use the simplest functions like: `avg, cout, max, min, mean` and `sum` directly on `GroupedData`, but most of the time, this will be too simple and you’ll want **to create a few aggregations during a single `groupBy` operation.** 

To do so you’ll be using the `agg` method, just like we do in Pandas:
- This is one of the greatest features of the DataFrames. 

In [None]:
df.groupBy("A").agg(F.avg("B"), F.min("B"), F.max("B")).show()

In [None]:
df.groupBy("A").agg(
    F.first("B").alias("first"),
    F.last("B").alias("last"),
    F.sum("B").alias("sum")
).show()

---

- The entry point into all SQL functionality in Spark is the `SQLContext` class. 
- To create a basic instance, all we need is a `SparkContext` reference.

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

> ## Creating a Spark DataFrame from a Row-RDD

- Take an existing RDD
- Convert (using `map` and `Row()`) it into a Row-RDD 
- Call the createDataFrame() funtion on the Row-RDD


---
### 1. Specifying the Schema and registering as a Table

With a SQLContext, we are ready to create a DataFrame from our existing RDD. 
But first we need to tell Spark SQL the schema in our data.

Spark SQL can convert an RDD of Row objects to a DataFrame. 
Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. 
- The keys define the column names, and 
- the types are inferred by looking at the first row. 

> Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need 
- to split the comma separated data, and then 
- use the information in KDD's 1999 task description to obtain the column names.
---

In [None]:
from pyspark.sql import Row
Row?

In [None]:
row_1 = Row(C1 = 'Pi', C2 = 3.142)

In [None]:
row_2 = Row(C1 = 'e', C2 = 2.7)

In [None]:
spark.createDataFrame([row_1, row_2]).show()

## Example: Analysing the KDD Cup Data
---

In [None]:
data_file = '/home/dush/Spark/kddcup.data_10_percent'
raw_data = sc.textFile(data_file).cache()

In [None]:
type(raw_data)

In [None]:
raw_data.is_cached

In [None]:
raw_data.take(1)

In [None]:
csv_data = raw_data.map(lambda l: l.split(","))

row_data = csv_data.map(lambda p: Row(duration=int(p[0]), 
                                      protocol_type=p[1],
                                      service=p[2],
                                      flag=p[3],
                                      src_bytes=int(p[4]),
                                      dst_bytes=int(p[5])))

In [None]:
type(row_data)

In [None]:
row_data.take(2)

- Once we have our RDD of Row we can infer and register the schema.

> ### Creating the Spark DF from an RDD of Row Objects

In [None]:
interactions_df = spark.createDataFrame(row_data)

In [None]:
interactions_df.show(10)

In [None]:
interactions_df.count()

> ### Registering the Spark DF as a Temp Table to run SQL queries on it

In [60]:
interactions_df.registerTempTable("interactions")

In [62]:
spark.sql("SELECT * FROM interactions LIMIT 10").show()

+---------+--------+----+-------------+-------+---------+
|dst_bytes|duration|flag|protocol_type|service|src_bytes|
+---------+--------+----+-------------+-------+---------+
|     5450|       0|  SF|          tcp|   http|      181|
|      486|       0|  SF|          tcp|   http|      239|
|     1337|       0|  SF|          tcp|   http|      235|
|     1337|       0|  SF|          tcp|   http|      219|
|     2032|       0|  SF|          tcp|   http|      217|
|     2032|       0|  SF|          tcp|   http|      217|
|     1940|       0|  SF|          tcp|   http|      212|
|     4087|       0|  SF|          tcp|   http|      159|
|      151|       0|  SF|          tcp|   http|      210|
|      786|       0|  SF|          tcp|   http|      212|
+---------+--------+----+-------------+-------+---------+



In [63]:
spark.sql("SELECT count(*) FROM interactions").show()

+--------+
|count(1)|
+--------+
|  494021|
+--------+



- We can easily have a look at our data frame schema using `printSchema`.

In [64]:
interactions_df.printSchema()

root
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)



- Now we can run SQL queries over our data frame that has been registered as a table. 

In [65]:
# Select tcp network interactions with 
# more than 1 second duration and 
# no transfer from destination

tcp_interactions = spark.sql("""
    SELECT duration, dst_bytes 
    FROM interactions 
    WHERE protocol_type = 'tcp' 
    AND duration > 1000 
    AND dst_bytes = 0
""")

tcp_interactions.count()

139

In [66]:
type(tcp_interactions)

pyspark.sql.dataframe.DataFrame

- The results of SQL queries are DFs and support all the normal DF functions.

In [67]:
# Output duration together with dst_bytes

tcp_interactions_out = (tcp_interactions
                        .rdd
                        .map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes)))

In [68]:
tcp_interactions_out.take(2)

['Duration: 5057, Dest. bytes: 0', 'Duration: 5059, Dest. bytes: 0']

- Spark DataFrame provides a **domain-specific language** for structured data manipulation. 
- This language includes _methods_ we can concatenate (or chain) in order to do `selection, filtering, grouping` etc. 

### 2. Count interactions by protocol type

In [69]:
(interactions_df
 .select("protocol_type", "duration", "dst_bytes")
 .groupBy("protocol_type")
 .mean()
 .show()
)

+-------------+------------------+------------------+
|protocol_type|     avg(duration)|    avg(dst_bytes)|
+-------------+------------------+------------------+
|          tcp|18.299576460684502|2248.4364612106383|
|          udp|   993.64616291638| 84.70968851331433|
|         icmp|               0.0|               0.0|
+-------------+------------------+------------------+



In [70]:
interactions_df.registerTempTable("table_1")

spark.sql("SELECT protocol_type, avg(duration), avg(dst_bytes) from table_1 group by protocol_type").show()

+-------------+------------------+------------------+
|protocol_type|     avg(duration)|    avg(dst_bytes)|
+-------------+------------------+------------------+
|          tcp|18.299576460684502|2248.4364612106383|
|          udp|   993.64616291638| 84.70968851331433|
|         icmp|               0.0|               0.0|
+-------------+------------------+------------------+



### the `describe()` method

In [71]:
print interactions_df.describe().show()

+-------+-----------------+-----------------+------+-------------+-------+------------------+
|summary|        dst_bytes|         duration|  flag|protocol_type|service|         src_bytes|
+-------+-----------------+-----------------+------+-------------+-------+------------------+
|  count|           494021|           494021|494021|       494021| 494021|            494021|
|   mean|868.5324247349809|47.97930249928647|  null|         null|   null|3025.6102959185946|
| stddev|33040.00125210233|707.7464723053739|  null|         null|   null| 988218.1010503998|
|    min|                0|                0|   OTH|         icmp|    IRC|                 0|
|    max|          5155468|            58329|    SH|          udp|  whois|         693375640|
+-------+-----------------+-----------------+------+-------------+-------+------------------+



In [None]:
spark.sql("""select flag, count(*) 
                 from interactions 
                group by flag""").show()

In [None]:
# %%timeit
# count how many interactions last more than 1 second, 
# with no data transfer from destination, grouped by protocol type

(interactions_df
 .select("protocol_type", "duration", "dst_bytes")
 .filter((interactions_df.duration > 1000) & (interactions_df.dst_bytes == 0))
 .groupBy("protocol_type")
 .count()
 .show()
)

In [None]:
# %%timeit
spark.sql("""SELECT protocol_type, count(*) 
           from interactions 
           where duration > 1000 
           and dst_bytes = 0 
           group by protocol_type""").show()

### 3. Count number of attack and normal interactions

In [74]:
# First we need to add the label column to our data.

def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"

row_labeled_data = csv_data.map(lambda p: Row(duration=int(p[0]), 
                                              protocol_type=p[1],
                                              service=p[2],
                                              flag=p[3],
                                              src_bytes=int(p[4]),
                                              dst_bytes=int(p[5]),
                                              label=get_label_type(p[41])))

interactions_labeled_df = spark.createDataFrame(row_labeled_data)

In [75]:
interactions_labeled_df.show()

+---------+--------+----+------+-------------+-------+---------+
|dst_bytes|duration|flag| label|protocol_type|service|src_bytes|
+---------+--------+----+------+-------------+-------+---------+
|     5450|       0|  SF|normal|          tcp|   http|      181|
|      486|       0|  SF|normal|          tcp|   http|      239|
|     1337|       0|  SF|normal|          tcp|   http|      235|
|     1337|       0|  SF|normal|          tcp|   http|      219|
|     2032|       0|  SF|normal|          tcp|   http|      217|
|     2032|       0|  SF|normal|          tcp|   http|      217|
|     1940|       0|  SF|normal|          tcp|   http|      212|
|     4087|       0|  SF|normal|          tcp|   http|      159|
|      151|       0|  SF|normal|          tcp|   http|      210|
|      786|       0|  SF|normal|          tcp|   http|      212|
|      624|       0|  SF|normal|          tcp|   http|      210|
|     1985|       0|  SF|normal|          tcp|   http|      177|
|      773|       0|  SF|

- Note: We don't have to register the schema if we do not plan on using SQL queries on the table.

In [76]:
(interactions_labeled_df
 .select("label")
 .groupBy("label")
 .count()
 .show())

+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+



In [77]:
# Count interactions them by label and protocol type, 
# in order to see how important the protocol type is to detect when an interaction is or not an attack.

(interactions_labeled_df
 .select("label", "protocol_type")
 .groupBy("label", "protocol_type")
 .count()
 .show())

+------+-------------+------+
| label|protocol_type| count|
+------+-------------+------+
|normal|          udp| 19177|
|normal|         icmp|  1288|
|normal|          tcp| 76813|
|attack|         icmp|282314|
|attack|          tcp|113252|
|attack|          udp|  1177|
+------+-------------+------+



In [78]:
# Possible to do more complex groupings by throwing a Bool in the mix

(interactions_labeled_df
 .select("label", "protocol_type", "dst_bytes")
 .groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0)
 .count()
 .show())

+------+-------------+---------------+------+
| label|protocol_type|(dst_bytes = 0)| count|
+------+-------------+---------------+------+
|normal|          udp|          false| 15583|
|attack|          udp|          false|    11|
|attack|          tcp|           true|110583|
|normal|          tcp|          false| 67500|
|attack|         icmp|           true|282314|
|attack|          tcp|          false|  2669|
|normal|          tcp|           true|  9313|
|normal|          udp|           true|  3594|
|normal|         icmp|           true|  1288|
|attack|          udp|           true|  1166|
+------+-------------+---------------+------+



> ### TASK 1. Count the number of interactions by label and protocol_type using a SQL query

In [80]:
interactions_labeled_df.registerTempTable("int_lab")

In [81]:
spark.sql("""SELECT label, protocol_type, count(*) 
from int_lab
group by label, protocol_type""").show()


+------+-------------+--------+
| label|protocol_type|count(1)|
+------+-------------+--------+
|normal|          udp|   19177|
|normal|         icmp|    1288|
|normal|          tcp|   76813|
|attack|         icmp|  282314|
|attack|          tcp|  113252|
|attack|          udp|    1177|
+------+-------------+--------+



---

## Example 2: Flights Data

To add csv import functionality to Spark, launch it with the following option `--packages com.databricks:spark-csv_2.11:1.4.0` such the the launch command becomes:


> `IPYTHON_OPTS="notebook" ./bin/pyspark --packages com.databricks:spark-csv_2.11:1.4.0`

and then use the `.read()` method of the SQLContext to import the csv file.

Syntax:

`sparkread.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(<path-to-csv>)`

In [82]:
!head -5 /home/dush/Documents/2008.csv

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,754,735,1002,1000,WN,3231,N772SW,128,145,113,2,19,IAD,TPA,810,5,10,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,628,620,804,750,WN,448,N428WN,96,90,76,14,8,IND,BWI,515,3,17,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,926,930,1054,1100,WN,1746,N612SW,88,90,78,-6,-4,IND,BWI,515,3,7,0,,0,NA,NA,NA,NA,NA


In [83]:
!wc -l /home/dush/Documents/2008.csv

7009729 /home/dush/Documents/2008.csv


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
flights = (spark
           .read
           .format('com.databricks.spark.csv')
           .options(header='true', inferschema='true')
           .load('/home/dush/Documents/2008.csv'))

In [4]:
type(flights)

pyspark.sql.dataframe.DataFrame

In [5]:
flights.dtypes[:4]

[('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int')]

In [6]:
flights.rdd.getNumPartitions()

6

In [7]:
flights.count()

7009728

In [8]:
flights.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [10]:
flights.agg({'Distance':'avg'}).show()

+-----------------+
|    avg(Distance)|
+-----------------+
|726.3870294253928|
+-----------------+



---
## Machine Learning in Spark - $teaser$

## MLlib
Apache Spark provides a general machine learning library — MLlib — that is designed for simplicity, scalability, and easy integration with other tools. With the scalability, language compatibility, and speed of Spark, data scientists can solve and iterate through their data problems faster.

From the inception of the Apache Spark project, MLlib was considered foundational for Spark’s success. The key benefit of MLlib is that it allows data scientists to focus on their data problems and models instead of solving the complexities surrounding distributed data (such as infrastructure, configurations, and so on). The data engineers can focus on distributed systems engineering using Spark’s easy-to-use APIs, while the data scientists can leverage the scale and speed of Spark core. Just as important, Spark MLlib is a general-purpose library, providing algorithms for most use cases while at the same time allowing the community to build upon and extend it for specialized use cases. To review the key terms of machine learning, please refer to Matthew Mayo’s Machine Learning Key Terms, Explained.

## ML Pipelines
Typically when running machine learning algorithms, it involves a sequence of tasks including pre-processing, feature extraction, model fitting, and validation stages. For example, when classifying text documents might involve text segmentation and cleaning, extracting features, and training a classification model with cross-validation. Though there are many libraries we can use for each stage, connecting the dots is not as easy as it may look, especially with large-scale datasets. Most ML libraries are not designed for distributed computation or they do not provide native support for pipeline creation and tuning.

<img src="https://databricks.com/wp-content/uploads/2016/06/ML-pipelines-diagram.png">

The ML Pipelines is a High-Level API for MLlib that lives under the “spark.ml” package. A pipeline consists of a sequence of stages. There are two basic types of pipeline stages: Transformer and Estimator. A Transformer takes a dataset as input and produces an augmented dataset as output. E.g., a tokenizer is a Transformer that transforms a dataset with text into an dataset with tokenized words. An Estimator must be first fit on the input dataset to produce a model, which is a Transformer that transforms the input dataset. E.g., logistic regression is an Estimator that trains on a dataset with labels and features and produces a logistic regression model.