<DIV ALIGN=CENTER>

# Introduction to Spark
## DataFrames, SQL, and Basic Data Analysis
## Professor Robert J. Brunner
  
</DIV>  
-----
-----

## Introduction

In this IPython Notebook, we explore using Spark to perform data
processing in a similar maner to our previous efforts with Pandas. For
this we will use the airline data, which has been stored in an HDFS
system that is accesible from within our Spark cluster. [Other][dw]
tutorials exist, although they often focus on Scala examples since Spark
is written for that language.

-----
[sp]: http://spark.apache.org
[sh]: http://hadoop.apache.org
[sy]: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
[shdfs]: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html
[sce]: http://techcrunch.com/2015/07/12/spark-and-hadoop-are-friends-not-foes/
[dw]: https://github.com/deanwampler/spark-workshop/tree/master/tutorial

In [1]:
# We release the SparkContext if it exists.
try:
    sc
except:
    pass ;
else:
    sc.stop()

# Now handle initial import statements
from pyspark import SparkConf, SparkContext

# Create new Spark Configuration (port numbers might need to be adjusted from defaults.)
myconf = SparkConf()
myconf.setMaster('local[*]')
myconf.setAppName("INFO490 SP16 W14-NB2: Professor Brunner")
myconf.set('spark.executor.memory', '1g')
#myconf.set("spark.driver.extraLibraryPath","/home/data_scientist/spark-cassandra-connector-1.6.0-M2-s_2.11.jar")
#myconf.set("spark.driver.extraClassPath","/home/data_scientist/spark-cassandra-connector-1.6.0-M2-s_2.11.jar")
#myconf.set("spark.cassandra.connection.host","http://40.124.12.119")

# Create and initialize a new Spark Context
sc = SparkContext(conf=myconf)

# Display Spark version information, which also verifies SparkContext is active
print("\nSpark version: {0}".format(sc.version))


Spark version: 1.6.0


-----

### Data Processing

In this Notebook, we will need sample data. To simplify acquiring data
to demonstrate using Spark DataFrames, we include the RDD code from the
[Introduction to Spark](intro2spark.ipynb) Notebook in the following
cell.

-----

In [2]:
filename = '/home/data_scientist/data/2001/2001-1.csv'

text_file = sc.textFile(filename)

col_data = text_file.map(lambda l: l.split(",")) \
            .map(lambda p: (p[0], p[1], p[2], p[4], p[14], p[15], p[16], p[17], p[18])) \
            .filter(lambda line: 'Year' not in line)

cols = col_data.filter(lambda line: 'NA' not in line)

fields = cols.map(lambda p: (int(p[0]), int(p[1]), int(p[2]), int(p[3]),
                          int(p[4]), int(p[5]), p[6], p[7], int(p[8])))

In [3]:
# Should be 480106 if everything works correctly
fields.count()

480106

-----

## Spark DataFrame

Spark supports a simplified [Data Frame][spdf] as part of the [Spark
SQL][spsql] library. We can create a Data Frame from an existing RDD by
also specifying the column labels and data types. The data types must
be one of the pre-defined [Spark SQL types][spdt]. After creating the
new DataFrame (which is backed by an RDD), we can perform many of the
same tasks with Spark that we performed with Pandas (but not all, and
not in as simple of an approach). The following code cells show how we
can take our 2001 flight data RDD and create a new Data Frame, which we
subsequently use in several subsequent code cells.

-----
[spdf]: https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes
[spsql]: https://spark.apache.org/sql/
[spdt]: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

In [4]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

schemaString = "Year Month DayOfMonth DepTime ArrDelay DepDelay Origin Destination Distance"

fieldTypes = [IntegerType(), IntegerType(), IntegerType(), IntegerType(), IntegerType(), IntegerType(), \
              StringType(), StringType(), IntegerType()]

f_data = [StructField(field_name, field_type, True) \
          for field_name, field_type in zip(schemaString.split(), fieldTypes)]

schema = StructType(f_data)

In [5]:
df = sqlContext.createDataFrame(fields, schema)
df

DataFrame[Year: int, Month: int, DayOfMonth: int, DepTime: int, ArrDelay: int, DepDelay: int, Origin: string, Destination: string, Distance: int]

-----

In the following three code cells, we `show` the first few lines of the
DataFrame, then use the `head` method, which displays more syntactic
information for each row, and finally use the `describe` method, which
doesn't execute until the `show` action is invoked. While the output is
less visually attractive than the Pandas result, we still obtain the
necessary information.

After these code cells, we access the DataFrame schema, first by using
the `printSchema` method to nicely output the schema, and next access a
column directly, which we can now do since we have named our DataFrame
columns.

-----

In [6]:
df.show(5)

+----+-----+----------+-------+--------+--------+------+-----------+--------+
|Year|Month|DayOfMonth|DepTime|ArrDelay|DepDelay|Origin|Destination|Distance|
+----+-----+----------+-------+--------+--------+------+-----------+--------+
|2001|    1|        17|   1806|      -3|      -4|   BWI|        CLT|     361|
|2001|    1|        18|   1805|       4|      -5|   BWI|        CLT|     361|
|2001|    1|        19|   1821|      23|      11|   BWI|        CLT|     361|
|2001|    1|        20|   1807|      10|      -3|   BWI|        CLT|     361|
|2001|    1|        21|   1810|      20|       0|   BWI|        CLT|     361|
+----+-----+----------+-------+--------+--------+------+-----------+--------+
only showing top 5 rows



In [7]:
df.head(4)

[Row(Year=2001, Month=1, DayOfMonth=17, DepTime=1806, ArrDelay=-3, DepDelay=-4, Origin='BWI', Destination='CLT', Distance=361),
 Row(Year=2001, Month=1, DayOfMonth=18, DepTime=1805, ArrDelay=4, DepDelay=-5, Origin='BWI', Destination='CLT', Distance=361),
 Row(Year=2001, Month=1, DayOfMonth=19, DepTime=1821, ArrDelay=23, DepDelay=11, Origin='BWI', Destination='CLT', Distance=361),
 Row(Year=2001, Month=1, DayOfMonth=20, DepTime=1807, ArrDelay=10, DepDelay=-3, Origin='BWI', Destination='CLT', Distance=361)]

In [8]:
df.describe().show()

+-------+--------------------+------+-----------------+------------------+-----------------+-----------------+-----------------+
|summary|                Year| Month|       DayOfMonth|           DepTime|         ArrDelay|         DepDelay|         Distance|
+-------+--------------------+------+-----------------+------------------+-----------------+-----------------+-----------------+
|  count|              480106|480106|           480106|            480106|           480106|           480106|           480106|
|   mean|              2001.0|   1.0|16.01370530674476| 1359.660206287778|6.382288494624103|8.781523246949632|716.9933556339641|
| stddev|1.065161224168148...|   0.0|  8.9369643824565|487.23695943583846|31.04865060768917|27.96630068676185|568.6557196351711|
|    min|                2001|     1|                1|                 1|              -80|              -59|               21|
|    max|                2001|     1|               31|              2400|             1688|     

In [9]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayOfMonth: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Distance: integer (nullable = true)



In [10]:
df.Year

Column<b'Year'>

-----

We can extract data from the DataFrame by using similar techniques to
what we used with Pandas. One difference is that we need to `filter` the
DataFrame, as opposed to directly access rows. However, we can filter
rows to extract flights that left O'Hare, and secondly those flights
that left O'Hare more than two hours late. In the second case, we also
tranform the output to `select` the _Destination_ column and a new
column that is the _Distance_ in kilometers.

-----

In [11]:
df.filter(df['Origin'] == 'ORD').count()

27455

In [12]:
df.filter(df['Origin'] == 'ORD').filter(df['DepDelay'] > 120).select(df['Destination'], df['Distance'] * 1.6).show(10)

+-----------+-----------------+
|Destination| (Distance * 1.6)|
+-----------+-----------------+
|        PHL|           1084.8|
|        CLT|958.4000000000001|
|        MEM|            785.6|
|        MEM|            785.6|
|        MEM|            785.6|
|        STL|            412.8|
|        STL|            412.8|
|        PVD|           1358.4|
|        LAX|           2792.0|
|        LAX|           2792.0|
+-----------+-----------------+
only showing top 10 rows



-----

## Spark SQL

Given a Spark DataFrame, we can apply SQL statements directly against
the DataFrame by registering the DataFrame as a Spark temporary SQL
table. The following code cells demonstrates this, as we register our
DataFrame as a `flights` table, and execute a SQL statement to select
the same data we obtained from our previous DataFrame filter.Since the
data are unordered, we have different results displayed via the `show`
method.

-----

In [13]:
df = sqlContext.createDataFrame(fields, schema)

df.registerTempTable("flights")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT Destination, Distance FROM flights WHERE Origin = 'ORD' AND DepDelay > 120")

# The results of SQL queries are RDDs and support all the normal RDD operations.
results.show(10)

+-----------+--------+
|Destination|Distance|
+-----------+--------+
|        PHL|     678|
|        CLT|     599|
|        MEM|     491|
|        MEM|     491|
|        MEM|     491|
|        STL|     258|
|        STL|     258|
|        PVD|     849|
|        LAX|    1745|
|        LAX|    1745|
+-----------+--------+
only showing top 10 rows



-----

## Spark Statistics


-----

In [14]:
from pyspark.mllib.stat import Statistics

sdt = fields.map(lambda p: (p[2], p[3], p[4], p[5], p[8]))

summary = Statistics.colStats(sdt)

mus = summary.mean()
mns = summary.min()
mxs = summary.max()

vrs = summary.variance()
nnzs = summary.numNonzeros()

In [15]:
cols = ['Day', 'Dep. Time', 'Arr. Delay', 'Dep. Delay', 'Distance']

# Print out Header
print('{0:>20s}{1:>12s}{2:>8s}{3:>10s}{4:>12s}'\
      .format('Mean', 'Variance', 'Min', 'Max', 'Non Zeroes'))
print(65*'-')

# Printout summary statistics
for idx, (m, v, mn, mx, n) in enumerate(zip(mus, vrs, mns, mxs, nnzs)):
    print('{5:10s}{0:10.2f}{1:12.2f}{2:8.2f}{3:10.2f}{4:12d}'\
          .format(m, v, mn, mx, int(n), cols[idx]))

                Mean    Variance     Min       Max  Non Zeroes
-----------------------------------------------------------------
Day            16.01       79.87    1.00     31.00      480106
Dep. Time    1359.66   237399.85    1.00   2400.00      480106
Arr. Delay      6.38      964.02  -80.00   1688.00      461157
Dep. Delay      8.78      782.11  -59.00   1692.00      393503
Distance      716.99   323369.33   21.00   4962.00      480106


-----

### Correlations


-----

In [16]:
# Demonstrate Correlation Measurements

# Sample Data
x = sc.parallelize([0, 1, 2])
y = sc.parallelize([1, 2, 4])
z = sc.parallelize([2, 1, 0])

print('x = ', x.collect())
print('y = ', y.collect())
print('z = ', z.collect())

print('\nPearson Correlation Tests')
print(25*'-')
print('x corr x = {0:+5.3f}'\
      .format(Statistics.corr(x, x, method='pearson')))

print('x corr y = {0:+5.3f}'\
      .format(Statistics.corr(x, y, method='pearson')))

print('x corr z = {0:+5.3f}'\
      .format(Statistics.corr(x, z, method='pearson')))

x =  [0, 1, 2]
y =  [1, 2, 4]
z =  [2, 1, 0]

Pearson Correlation Tests
-------------------------
x corr x = +1.000
x corr y = +0.982
x corr z = -1.000


In [17]:
# Set print precision of matrices
import numpy as np
np.set_printoptions(precision=3)

# Compute correlation of three columns in RDD
cd = sdt.map(lambda p: (p[1], p[2], p[3]))

print('Dearture Time, Arrival Delay, Departure Delay')

print('\nPearson Correlation Matrix:')
print(Statistics.corr(cd, method='pearson'))

print('\nSpearman Correlation Matrix:')
print(Statistics.corr(cd, method='spearman'))

Dearture Time, Arrival Delay, Departure Delay

Pearson Correlation Matrix:
[[ 1.     0.134  0.167]
 [ 0.134  1.     0.904]
 [ 0.167  0.904  1.   ]]

Spearman Correlation Matrix:
[[ 1.     0.109  0.173]
 [ 0.109  1.     0.616]
 [ 0.173  0.616  1.   ]]


-----

### Random Data and Sampling

-----

In [18]:
from pyspark.mllib.random import RandomRDDs

ud = RandomRDDs.uniformRDD(sc, 1000, seed=23)

nd = RandomRDDs.normalRDD(sc, 1000, seed=23)

pd = RandomRDDs.poissonRDD(sc, mean=2.0, size=1000, seed=23)

In [19]:
print('Uniform Distribution Statistics\n', ud.stats())

Uniform Distribution Statistics
 (count: 1000, mean: 0.495907509202282, stdev: 0.298581265498, max: 0.99957542053, min: 0.000220626980565)


In [20]:
print('Normal Distribution Statistics\n', nd.stats())

Normal Distribution Statistics
 (count: 1000, mean: -0.01951879687296531, stdev: 0.936332160006, max: 2.76048478382, min: -3.10768336984)


In [21]:
print('Poisson Distribution Statistics\n', pd.stats())

Poisson Distribution Statistics
 (count: 1000, mean: 2.0089999999999995, stdev: 1.45771019068, max: 9.0, min: 0.0)


In [22]:
# Sample without replacement

frac = 0.25

ds = nd.sample(False, frac)
print(ds.stats())

(count: 251, mean: -0.08177014217381999, stdev: 0.980548214492, max: 2.64550887934, min: -2.98699042684)


In [23]:
# Sample with replacement
ds = nd.sample(True, frac)
print(ds.stats())

(count: 239, mean: 0.08745964539326694, stdev: 0.862006893279, max: 2.64550887934, min: -2.76733479325)


-----
### Student Activity

In the preceding cells, we introduced Spark DataFrames, Spark SQL, and Basic Statistics with Spark. Now that you have run the Notebook, go back and make the
following changes to see how the results change.

1. Change the DataFrame to ...

2. New SQL query

3. Compute Statistics for Poisson. Now swithc to lognormal and calculate statistics.
`Numbers`.

4. Add an index column to this Spark DataFrame, which sequentially
increases.

Additional, more advanced problems:

1. Create a DataFrame containing the 'Year', 'Month', 'DayofMonth', 'dDelay',
and 'Origin' columns for the airline data.

2. Filter this DataFrame to contain only flight data for flights leaving Willard airport.

-----

-----

### Ending the Spark Session

We must stop the `SparkContext` in order to release resources on the
instructional cluster before existing this Notebook.

-----

In [24]:
sc.stop()