<DIV ALIGN=CENTER>

# Data Processing in Spark
  
</DIV>  
-----
-----
This notebook continues exploring spark to perform data processing in a similar manner to your previous experience with Pandas in Python. We will use the airline data, which has been stored in HDFS on the EMR cluster. It is accesible from the Spark cluster. You will be asked to solve some simple problems at the end. There are some challenge activities at the end that you can try to answer. They will have an extra credit. 

Coming back to Spark, it is a cluster computing system that leverages Hadoop technologies like HDFS for high performance storage and Yarn for cluster management. While some may see Spark as a replacement for Hadoop, an alternative argument can be made that Spark is simply another compute engine for Hadoop, in addition to Map-Reduce.

### Initialization

Although we are starting a new sparkContext here, always make sure any SparkContext previously used by the Jupyter Server should be properly released before starting a new one. Lets initialize a new SparkContext to interact with the Spark cluster.

----- 

In [13]:
# We release the SparkContext if it exists.
try:
    sc
except:
    pass ;
else:
    sc.stop()

# Now handle initial import statements
from pyspark import SparkConf, SparkContext

sc = SparkContext()

### Using Spark

Spark is a framework for processing large-data tasks, in general this means Petabytes (or more of data). Spark can run on the HDFS file system, which can be set up to chunk files into blocks and to replicate these blocks across a cluster's storage to promote increased performance. Spark abstracts these details, however, allowing us to develop an application on a small system and scale up to large data on a cluster. 

In Spark, communications move between a driver process and the execution processes. This communication is handled for us by using a `SparkContext`, which requests resources from the Spark master process, such as number of cores, which are reserved to complete our Spark tasks. Once a Spark Context is active, we can use the Spark Console to monitor jobs and the overall Spark infrastructure. 

In [14]:
data = range(50)
print(data)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]


In [15]:
myRDD = sc.parallelize(data, 8)

In the previous code cell, we created a parallelized collection by using the parallelize method, which partitions the data across cores in a cluster. The general rule indicated in the Spark documentation is that you want 2-4 partitions per core.


Next, we use several functions on the RDD to obtain the RDD unique ID, which indicates when new RDDs are created, as well as naming RDDs to view them easily in the Spark cluster management software.

In [16]:
print("Initial RDD id: {0}".format(myRDD.id()))

Initial RDD id: 0


In [17]:
myRDD.setName("DSA RDD")

DSA RDD ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

-----

Now, given this simple RDD, we can apply a transformation, in this case
we simply add one to each element in the RDD. This tranformation doesn't
actually happen until we call an action method, which first occurs in
the third code cell below when we call the `collect` method. The new RDD
has been created, however as indicated by its new id.

-----

In [18]:
myaddRDD = myRDD.map(lambda a: a + 1)

In [19]:
print(myaddRDD.toDebugString())

(8) PythonRDD[1] at RDD at PythonRDD.scala:43 []
 |  DSA RDD ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 []


In [20]:
print(myaddRDD.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]


-----

We can now apply a second transformation, in this case we apply a
filter, which selects values from the RDD based on a condition (in this
example we select valus that are evenly divisible by 5). The
transformation doesn't occur, however, until we once again call the
`collect` method, which _collects_ the results of the different
transformations.

-----

In [21]:
myfilterRDD = myaddRDD.filter(lambda x: (x % 5) == 0)

In [22]:
myfilterRDD.collect()

[5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

In [23]:
print(myfilterRDD.toDebugString())

(8) PythonRDD[2] at collect at <string>:1 []
 |  DSA RDD ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 []


-----

Tranformations, however, can be chained together in a process called
pipelining. Doing so can produce long code strings, which can be
difficult to follow (or debug). Thus, it is considered good style
to break pipelined operations such that each transformation occurs on a
separate line. The following code combines the previous Spark tasks
together into a single line, but shown using recommended style.

-----

In [24]:
(sc
 .parallelize(data)
 .map(lambda x: x + 1)
 .filter(lambda x: (x % 5) == 0)
 .collect())

[5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

-----

### Data Processing

Previously in this Notebook, we have used Spark to create simple RDDs
that demonstrated Spark transformations and actions on small data. Now
we will change approaches and analyze the airline data, first starting
with the single 2001 flight data file. We can create a new RDD by
reading in the data as a textfile, after which we execute the RDD
creation by counting the number of lines in the RDD. We subsequently
apply several other RDD methods to display the first few rows of data by
using the `take` method. Finally, we use the built-in `help` to see the
list of supported RDD methods.

-----



In [25]:
filename = '/user/hadoop/Datasets/flights.csv'

text_file = sc.textFile(filename)

In [26]:
text_file.count()

5819080

In [27]:
text_file.take(5)

[u'YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY', u'2015,1,1,4,AS,98,N407AS,ANC,SEA,0005,2354,-11,21,0015,205,194,169,1448,0404,4,0430,0408,-22,0,0,,,,,,', u'2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,0010,0002,-8,12,0014,280,279,263,2330,0737,4,0750,0741,-9,0,0,,,,,,', u'2015,1,1,4,US,840,N171US,SFO,CLT,0020,0018,-2,16,0034,286,293,266,2296,0800,11,0806,0811,5,0,0,,,,,,', u'2015,1,1,4,AA,258,N3HYAA,LAX,MIA,0020,0015,-5,15,0030,285,281,258,2342,0748,8,0805,0756,-9,0,0,,,,,,']

In [28]:
# Display help info on spark rdd
help(text_file)

Help on RDD in module pyspark.rdd object:

class RDD(__builtin__.object)
 |  A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
 |  Represents an immutable, partitioned collection of elements that can be
 |  operated on in parallel.
 |  
 |  Methods defined here:
 |  
 |  __add__(self, other)
 |      Return the union of this RDD and another one.
 |      
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))
 |  
 |  __repr__(self)
 |  
 |  aggregate(self, zeroValue, seqOp, combOp)
 |      Aggregate the elements of each partition, and then the results for all
 |      the partitions, using a given combine functions and a neutral "zero
 |      value."
 |      
 |      The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
 |      as its result value to avoid object allocat

 |  rightOuterJoin(self, other, numPartitions=None)
 |      Perform a right outer join of C{self} and C{other}.
 |      
 |      For each element (k, w) in C{other}, the resulting RDD will either
 |      contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
 |      if no elements in C{self} have key k.
 |      
 |      Hash-partitions the resulting RDD into the given number of partitions.
 |      
 |      >>> x = sc.parallelize([("a", 1), ("b", 4)])
 |      >>> y = sc.parallelize([("a", 2)])
 |      >>> sorted(y.rightOuterJoin(x).collect())
 |      [('a', (2, 1)), ('b', (None, 4))]
 |  
 |  sample(self, withReplacement, fraction, seed=None)
 |      Return a sampled subset of this RDD.
 |      
 |      :param withReplacement: can elements be sampled multiple times (replaced when sampled out)
 |      :param fraction: expected size of the sample as a fraction of this RDD's size
 |          without replacement: probability that each element is chosen; fraction must be [0,

-----

With this text RDD, we can begin to process the data. Since our data is,
at this point, simply a list of strings, we first need to transform the
data into columns, remove the header row, and extract out the columns of
interest. These steps are pipelined to create a single RDD, that isn't
processed until we execute an action method, in this case, the `first`
method that displays the first row in the new RDD.

-----

In [29]:
col_data = text_file.map(lambda l: l.split(",")) \
            .map(lambda p: (p[0], p[1], p[2], p[7], p[8], p[10], p[11], p[17], p[22])) \
            .filter(lambda line: 'YEAR' not in line)

In [30]:
col_data.first()

(u'2015', u'1', u'1', u'ANC', u'SEA', u'2354', u'-11', u'1448', u'-22')

In [31]:
col_data.take(5)

[(u'2015', u'1', u'1', u'ANC', u'SEA', u'2354', u'-11', u'1448', u'-22'), (u'2015', u'1', u'1', u'LAX', u'PBI', u'0002', u'-8', u'2330', u'-9'), (u'2015', u'1', u'1', u'SFO', u'CLT', u'0018', u'-2', u'2296', u'5'), (u'2015', u'1', u'1', u'LAX', u'MIA', u'0015', u'-5', u'2342', u'-9'), (u'2015', u'1', u'1', u'SEA', u'ANC', u'0024', u'-1', u'1448', u'-21')]

Count the number of rows in col_data

In [32]:
col_data.count()

5819079

-----

Spark, unlike Pandas, will not handle NA values. Thus we need an
additional tranform to remove lines from our RDD that contain missing
data. We can accomplish this by using an appropriate filter.

-----

In [33]:
cols = col_data.filter(lambda line: '' not in line)

Count the number of rows in col_data which doesn't have any null values. 

In [34]:
cols.count()

5714008

Count the number of rows in col_data which have null values. 

In [35]:
na_rows = col_data.filter(lambda line: '' in line)
na_rows.count()

105071

In [36]:
5714008+105071

5819079

Make sure the sum of both counts rows with null values and rows free of null values is equal to total number of rows in col_data

In [37]:
5726566+92513

5819079

-----

To analyze these data, however, we need to convert the columns to the
appropriate data types. In this case, we can simply apply one final
transformation.

-----

In [38]:
fields = cols.map(lambda p: (int(p[0]), int(p[1]), int(p[2]), p[3],
                          p[4], int(p[5]), int(p[6]), int(p[7]), int(p[8])))

In [39]:
fields.take(1)

[(2015, 1, 1, u'ANC', u'SEA', 2354, -11, 1448, -22)]

-----

## Spark DataFrame

Spark supports a simplified [Data Frame][spdf] as part of the [Spark
SQL][spsql] library. We can create a Data Frame from an existing RDD by
also specifying the column labels and data types. The data types must
be one of the pre-defined [Spark SQL types][spdt]. After creating the
new DataFrame (which is backed by an RDD), we can perform many of the
same tasks with Spark that we performed with Pandas (but not all, and
not in as simple of an approach). The following code cells show how we
can take our 2001 flight data RDD and create a new Data Frame, which we
subsequently use in several subsequent code cells.

-----
[spdf]: https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes
[spsql]: https://spark.apache.org/sql/
[spdt]: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

In [40]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

schemaString = "Year Month DayOfMonth Origin Destination DepTime DepDelay Distance ArrDelay"

fieldTypes = [IntegerType(), IntegerType(), IntegerType(), \
              StringType(), StringType(), IntegerType(), \
              IntegerType(), IntegerType(), IntegerType()]

f_data = [StructField(field_name, field_type, True) \
          for field_name, field_type in zip(schemaString.split(), fieldTypes)]

schema = StructType(f_data)

In [41]:
df = sqlContext.createDataFrame(fields, schema)
print(df)

DataFrame[Year: int, Month: int, DayOfMonth: int, Origin: string, Destination: string, DepTime: int, DepDelay: int, Distance: int, ArrDelay: int]


-----

In the following three code cells, we `show` the first few lines of the
DataFrame, then use the `head` method, which displays more syntactic
information for each row, and finally use the `describe` method, which
doesn't execute until the `show` action is invoked. While the output is
less visually attractive than the Pandas result, we still obtain the
necessary information.

After these code cells, we access the DataFrame schema, first by using
the `printSchema` method to nicely output the schema, and next access a
column directly, which we can now do since we have named our DataFrame
columns.

-----

In [42]:
df.show(5)

+----+-----+----------+------+-----------+-------+--------+--------+--------+
|Year|Month|DayOfMonth|Origin|Destination|DepTime|DepDelay|Distance|ArrDelay|
+----+-----+----------+------+-----------+-------+--------+--------+--------+
|2015|    1|         1|   ANC|        SEA|   2354|     -11|    1448|     -22|
|2015|    1|         1|   LAX|        PBI|      2|      -8|    2330|      -9|
|2015|    1|         1|   SFO|        CLT|     18|      -2|    2296|       5|
|2015|    1|         1|   LAX|        MIA|     15|      -5|    2342|      -9|
|2015|    1|         1|   SEA|        ANC|     24|      -1|    1448|     -21|
+----+-----+----------+------+-----------+-------+--------+--------+--------+
only showing top 5 rows



In [43]:
df.head(4)

[Row(Year=2015, Month=1, DayOfMonth=1, Origin=u'ANC', Destination=u'SEA', DepTime=2354, DepDelay=-11, Distance=1448, ArrDelay=-22), Row(Year=2015, Month=1, DayOfMonth=1, Origin=u'LAX', Destination=u'PBI', DepTime=2, DepDelay=-8, Distance=2330, ArrDelay=-9), Row(Year=2015, Month=1, DayOfMonth=1, Origin=u'SFO', Destination=u'CLT', DepTime=18, DepDelay=-2, Distance=2296, ArrDelay=5), Row(Year=2015, Month=1, DayOfMonth=1, Origin=u'LAX', Destination=u'MIA', DepTime=15, DepDelay=-5, Distance=2342, ArrDelay=-9)]

In [44]:
df.describe().show()

+-------+-------+------------------+------------------+------------------+-----------------+-----------------+-----------------+
|summary|   Year|             Month|        DayOfMonth|           DepTime|         DepDelay|         Distance|         ArrDelay|
+-------+-------+------------------+------------------+------------------+-----------------+-----------------+-----------------+
|  count|5714008|           5714008|           5714008|           5714008|          5714008|          5714008|          5714008|
|   mean| 2015.0|  6.54779937304953|15.707591063925706| 1335.065627489496| 9.29484190431655|824.4569032804994|4.407057357987598|
| stddev|    0.0|3.3974211418324516| 8.774394160366096|496.41981689472664|36.88972372075696|608.6619895866803|39.27129709388608|
|    min|   2015|                 1|                 1|                 1|              -82|               31|              -87|
|    max|   2015|                12|                31|              2400|             1988|     

In [45]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayOfMonth: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)



-----

We can extract data from the DataFrame by using similar techniques to
what we used with Pandas. One difference is that we need to `filter` the
DataFrame, as opposed to directly access rows. However, we can filter
rows to extract flights that left O'Hare, and secondly those flights
that left O'Hare more than two hours late. In the second case, we also
tranform the output to `select` the _Destination_ column and a new
column that is the _Distance_ in kilometers.

-----

In [46]:
df.filter(df['Origin'] == 'ORD').count()

276554

In [47]:
df.filter(df['Origin'] == 'ORD').filter(df['DepDelay'] > 120).select(df['Destination'], df['Distance'] * 1.6).show(10)

+-----------+------------------+
|Destination|  (Distance * 1.6)|
+-----------+------------------+
|        SJU|3315.2000000000003|
|        MSP|             534.4|
|        DFW|            1283.2|
|        MIA|            1915.2|
|        MCO|            1608.0|
|        DFW|            1283.2|
|        MSP|             534.4|
|        RSW|            1792.0|
|        MSP|             534.4|
|        MSP|             534.4|
+-----------+------------------+
only showing top 10 rows



-----

## Spark SQL

Given a Spark DataFrame, we can apply SQL statements directly against
the DataFrame by registering the DataFrame as a Spark temporary SQL
table. The following code cells demonstrates this, as we register our
DataFrame as a `flights` table, and execute a SQL statement to select
the same data we obtained from our previous DataFrame filter.Since the
data are unordered, we have different results displayed via the `show`
method.

-----

In [48]:
df = sqlContext.createDataFrame(fields, schema)

df.registerTempTable("flights")

# SQL can be run over DataFrames that have been registered as a table.
sql_q = "SELECT Destination, Distance FROM flights WHERE Origin = 'ORD' AND DepDelay > 120"

results = sqlContext.sql(sql_q)

# The results of SQL queries are RDDs and support all the normal RDD operations.
results.show(10)

+-----------+--------+
|Destination|Distance|
+-----------+--------+
|        SJU|    2072|
|        MSP|     334|
|        DFW|     802|
|        MIA|    1197|
|        MCO|    1005|
|        DFW|     802|
|        MSP|     334|
|        RSW|    1120|
|        MSP|     334|
|        MSP|     334|
+-----------+--------+
only showing top 10 rows



-----

## Spark Statistics

The simplest type of data analysis is to compute basic statistical
measures of sequences of data. The Spark MLlib package includes a 
[basic statistical][sbs] component that can be easily used to obtain
statistical measurements of multiple columns in a Spark RDD. We
demonstrate this in the following code cells, where we create an RDD
from numeric columns in our `fields` RDD. We use the `colStats` function
from the `Statistics` object to compute a range of statistical measures
in one pass for all columns in the `sdt` RDD. In the second code cell,
we simply provide a nicely formatted display of these quantities for
each column.

-----

[sbs]: https://spark.apache.org/docs/latest/mllib-statistics.html

In [49]:
from pyspark.mllib.stat import Statistics

# Extract numeric columns and compute statistics
sdt = fields.map(lambda p: (p[2], p[5], p[6], p[7], p[8]))
summary = Statistics.colStats(sdt)

# Extract individual statistics for RDD
mus = summary.mean()
mns = summary.min()
mxs = summary.max()
vrs = summary.variance()
nnzs = summary.numNonzeros()

In [50]:
# Labels for display
cols = ['Day', 'Dep. Time', 'Dep. Delay', 'Distance', 'Arr. Delay']

# Print out Header
print('{0:>20s}{1:>12s}{2:>8s}{3:>10s}{4:>12s}'\
      .format('Mean', 'Variance', 'Min', 'Max', 'Non Zeroes'))
print(65*'-')

# Printout summary statistics
for idx, (m, v, mn, mx, n) in enumerate(zip(mus, vrs, mns, mxs, nnzs)):
    print('{5:10s}{0:10.2f}{1:12.2f}{2:8.2f}{3:10.2f}{4:12d}'\
          .format(m, v, mn, mx, int(n), cols[idx]))

                Mean    Variance     Min       Max  Non Zeroes
-----------------------------------------------------------------
Day            15.71       76.99    1.00     31.00     5714008
Dep. Time    1335.07   246432.63    1.00   2400.00     5714008
Dep. Delay      9.29     1360.85  -82.00   1988.00     5385566
Distance      824.46   370469.42   31.00   4983.00     5714008
Arr. Delay      4.41     1542.23  -87.00   1971.00     5587795


-----

### Correlations

Another useful function is to compute the correlation between different
data sequences. The Spark MLlib package includes the `corr` method
within the Statistics component to compute correlations between
individual data sequences, or via the columns in an RDD. The `corr`
method can also calculate either the _Pearson_ correlation, which is the
default, or the _Spearman_ correlation. In the first code cell, we
create several data sequences, turn them into Spark data structures via
the `parallelize` method, and compute the Pearson correlation
coefficient between the different data sequences. In the second code
cell, we create a new RDD from three columns in the `sdt` RDD, and
compute both the Pearson and Spearman correlations between the columns
in this RDD.

-----

In [51]:
# Demonstrate Correlation Measurements

# Sample Data
x = sc.parallelize([0, 1, 2])
y = sc.parallelize([1, 2, 4])
z = sc.parallelize([2, 1, 0])

print('x = ', x.collect())
print('y = ', y.collect())
print('z = ', z.collect())

print('\nPearson Correlation Tests')
print(25*'-')
print('x corr x = {0:+5.3f}'\
      .format(Statistics.corr(x, x, method='pearson')))

print('x corr y = {0:+5.3f}'\
      .format(Statistics.corr(x, y, method='pearson')))

print('x corr z = {0:+5.3f}'\
      .format(Statistics.corr(x, z, method='pearson')))

('x = ', [0, 1, 2])
('y = ', [1, 2, 4])
('z = ', [2, 1, 0])

Pearson Correlation Tests
-------------------------
x corr x = +1.000
x corr y = +0.982
x corr z = -1.000


In [52]:
# Set print precision of matrices
import numpy as np
np.set_printoptions(precision=3)

# Compute correlation of three columns in RDD
cd = sdt.map(lambda p: (p[1], p[2], p[4]))

print('Departure Time, Departure Delay, Arrival Delay')

print('\nPearson Correlation Matrix:')
print(Statistics.corr(cd, method='pearson'))

print('\nSpearman Correlation Matrix:')
print(Statistics.corr(cd, method='spearman'))

Departure Time, Departure Delay, Arrival Delay

Pearson Correlation Matrix:
[[ 1.     0.172  0.16 ]
 [ 0.172  1.     0.945]
 [ 0.16   0.945  1.   ]]

Spearman Correlation Matrix:
[[ 1.     0.228  0.172]
 [ 0.228  1.     0.641]
 [ 0.172  0.641  1.   ]]


### Student Activities

Make the following changes to see how the results change.


**Activity 1:** Change the `myRDD` example to start with all integers from 0 to 399. Use an appropriate lambda function to convert this RDD to a new RDD that has all odd integers from 1 to 399.

In [71]:
## Your code for activity 1 goes here
data= range(400)
myReDD=(sc
 .parallelize(data)
 .map(lambda x: x + 1))

**Activity 2:** Filter the previous RDD to contain only entries that are divisible by 9.

In [72]:
## Your code for activity 2 goes here
myaddReDD = myReDD.filter(lambda x: (x % 9)== 0)
myaddReDD.collect()

[9, 18, 27, 36, 45, 54, 63, 72, 81, 90, 99, 108, 117, 126, 135, 144, 153, 162, 171, 180, 189, 198, 207, 216, 225, 234, 243, 252, 261, 270, 279, 288, 297, 306, 315, 324, 333, 342, 351, 360, 369, 378, 387, 396]

**Activity 3:** Convert this RDD to a Spark DataFrame, specify the column name as `Numbers`.

In [73]:
## Your code for activity 3 goes here
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

schemaString = "Numbers"

fields = [StructField(schemaString, IntegerType(), True)]

schema = StructType(fields)


In [74]:
Numbers = myaddReDD.map(lambda p: (int(p),))

In [75]:
datfrme = sqlContext.createDataFrame(Numbers, schema)
datfrme.show(5)

+-------+
|Numbers|
+-------+
|      9|
|     18|
|     27|
|     36|
|     45|
+-------+
only showing top 5 rows



**Activity 4:** Change the DataFrame to include different columns from the flights data. You might review the original [airline data set](http://stat-computing.org/dataexpo/2009/) website to see the column descriptions.

In [None]:
[u'YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,
SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,
AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,
CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY',
u'2015,1,1,4,AS,98,N407AS,ANC,SEA,0005,2354,-11,21,0015,205,194,169,1448,0404,4,0430,0408,-22,0,0,,,,,,',
u'2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,0010,0002,-8,12,0014,280,279,263,2330,0737,4,0750,0741,-9,0,0,,,,,,',
u'2015,1,1,4,US,840,N171US,SFO,CLT,0020,0018,-2,16,0034,286,293,266,2296,0800,11,0806,0811,5,0,0,,,,,,', 
u'2015,1,1,4,AA,258,N3HYAA,LAX,MIA,0020,0015,-5,15,0030,285,281,258,2342,0748,8,0805,0756,-9,0,0,,,,,,']

In [61]:
col_data = text_file.map(lambda l: l.split(",")) \
            .map(lambda p: (p[0], p[1], p[2], p[7], p[8], p[10], p[11], p[17], p[22],p[16])) \
            .filter(lambda line: 'YEAR' not in line)

In [62]:
cols = col_data.filter(lambda line: '' not in line)

In [63]:
fields = cols.map(lambda p: (int(p[0]), int(p[1]), int(p[2]), p[3],
                          p[4], int(p[5]), int(p[6]), int(p[7]), int(p[8]),int(p[9])))

In [64]:
## Your code for activity 4 goes here
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

schemaString = "Year Month DayOfMonth Origin Destination DepTime DepDelay Distance ArrDelay AirTime "

fieldTypes = [IntegerType(), IntegerType(), IntegerType(), \
              StringType(), StringType(), IntegerType(), \
              IntegerType(), IntegerType(), IntegerType(),IntegerType()]

f_data = [StructField(field_name, field_type, True) \
          for field_name, field_type in zip(schemaString.split(), fieldTypes)]

schema = StructType(f_data)
dataframe = sqlContext.createDataFrame(fields, schema)

print(dataframe)

DataFrame[Year: int, Month: int, DayOfMonth: int, Origin: string, Destination: string, DepTime: int, DepDelay: int, Distance: int, ArrDelay: int, AirTime: int]


In [65]:
dataframe.show(5)

+----+-----+----------+------+-----------+-------+--------+--------+--------+-------+
|Year|Month|DayOfMonth|Origin|Destination|DepTime|DepDelay|Distance|ArrDelay|AirTime|
+----+-----+----------+------+-----------+-------+--------+--------+--------+-------+
|2015|    1|         1|   ANC|        SEA|   2354|     -11|    1448|     -22|    169|
|2015|    1|         1|   LAX|        PBI|      2|      -8|    2330|      -9|    263|
|2015|    1|         1|   SFO|        CLT|     18|      -2|    2296|       5|    266|
|2015|    1|         1|   LAX|        MIA|     15|      -5|    2342|      -9|    258|
|2015|    1|         1|   SEA|        ANC|     24|      -1|    1448|     -21|    199|
+----+-----+----------+------+-----------+-------+--------+--------+--------+-------+
only showing top 5 rows



**Activity 5:** Use a SQL query on the `df` DataFrame to compute the mean distance between all flights from O'Hare to Los Angeles International Airport (LAX).

In [69]:
## Your code for activity 5 goes here

dataframe.registerTempTable("flights")

# SQL can be run over DataFrames that have been registered as a table.
sql_q = "SELECT Mean(Distance) as mean_distance FROM flights WHERE Origin = 'ORD' AND Destination = 'LAX' "

results = sqlContext.sql(sql_q)

# The results of SQL queries are RDDs and support all the normal RDD operations.
results.show()


+-------------+
|mean_distance|
+-------------+
|       1744.0|
+-------------+



### Additional, more advanced problems:

**Activity 6:** Add an index column to the Spark DataFrame created in activity 3, which sequentially increases.

In [85]:
## Your code for activity 6 goes here
datfrme2 = (datfrme.flatMap(lambda x:x)
    .zipWithIndex()
    .map(lambda x: Row(id=x[1],Numbers=int(x[0])))) 
df_id=sqlContext.createDataFrame(datfrme2)
df_id.show(10)     


+-------+---+
|Numbers| id|
+-------+---+
|      9|  0|
|     18|  1|
|     27|  2|
|     36|  3|
|     45|  4|
|     54|  5|
|     63|  6|
|     72|  7|
|     81|  8|
|     90|  9|
+-------+---+
only showing top 10 rows



**Activity 7:** Create an RDD containing the 'Year', 'Month', 'DayofMonth', 'dDelay',
and 'Origin' columns for the airline data..

In [90]:
col_data = text_file.map(lambda l: l.split(",")) \
            .map(lambda p: (p[0], p[1], p[2], p[11], p[7])) \
            .filter(lambda line: 'YEAR' not in line)

In [91]:
cols = col_data.filter(lambda line: '' not in line)

In [92]:
fields = cols.map(lambda p: (int(p[0]), int(p[1]), int(p[2]), int(p[3]),
                          p[4]))

In [94]:
## Your code for activity 7 goes here
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

schemaString = "Year Month DayOfMonth DepDelay Origin "

fieldTypes = [IntegerType(), IntegerType(), IntegerType(), \
              IntegerType(), StringType()]

f_data = [StructField(field_name, field_type, True) \
          for field_name, field_type in zip(schemaString.split(), fieldTypes)]

schema = StructType(f_data)
dframe = sqlContext.createDataFrame(fields, schema)


In [97]:
dframe.show(5)

+----+-----+----------+--------+------+
|Year|Month|DayOfMonth|DepDelay|Origin|
+----+-----+----------+--------+------+
|2015|    1|         1|     -11|   ANC|
|2015|    1|         1|      -8|   LAX|
|2015|    1|         1|      -2|   SFO|
|2015|    1|         1|      -5|   LAX|
|2015|    1|         1|      -1|   SEA|
+----+-----+----------+--------+------+
only showing top 5 rows



In [104]:
rdd = dframe.rdd
rdd.take(5)

[Row(Year=2015, Month=1, DayOfMonth=1, DepDelay=-11, Origin=u'ANC'), Row(Year=2015, Month=1, DayOfMonth=1, DepDelay=-8, Origin=u'LAX'), Row(Year=2015, Month=1, DayOfMonth=1, DepDelay=-2, Origin=u'SFO'), Row(Year=2015, Month=1, DayOfMonth=1, DepDelay=-5, Origin=u'LAX'), Row(Year=2015, Month=1, DayOfMonth=1, DepDelay=-1, Origin=u'SEA')]

-----

# Optional/ Extra material

## Machine Learning

The bulk of the MLlib package is focused on performing machine learning
at scale by using Spark. With functions for computing classification,
regression, clustering, dimensional reduction, and more, the library
extends considerable power to the Spark user. Since we have already
covered these concepts by using Python and scikit-learn, in the rest of
this Notebook, we will present two specific machine learning algorithms
in order to demonstrate the basic concepts required to work with the
tools in the Spark MLlib package.

-----

### Linear Modeling

One of the simplest machine learning techniques is [linear regression][slr].
The main difference when using Spark is that for this supervised
learning technique our data must be in a Spark specific data structure
called [`LabeledPoint`][slp]. Spark provides several data structures to
simplify the application of distributed machine learning algorithms at
scale. The labeled nature refers to the label, used for training, that
is associated with the point. The first item in the data structure is
the label, while the second item is the set of feature columns.

In the following code cells, we first create a new data structure that
extracts the arrival delay to be the label and the departure delay as
the feature. These data re turned into a Spark sequence containing
`LabeledPoint` values for each row in the original RDD. Next we display
the first rows in the new sequence, and next we train the linear
regressor (using SVD in this case) and specify a number of iterations
and step size. You should feel free to modify these values and see the
impact on the resulting performance. Finally, we compute several
regression metrics to quantify the performance of this method on these
data (recall that the data span a large range, hence the RMSE is quite
reasonable).

-----

[slp]: https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point
[slr]: https://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-least-squares-lasso-and-ridge-regression

In [None]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.ml.regression import LinearRegressionModel

# Minimum departure delay
min_delay = 5.
data = fields.filter(lambda p: p[6] > min_delay).map(lambda p: LabeledPoint(p[8], [p[6]]))

In [None]:
data.take(5)

In [None]:
lr_model = LinearRegressionWithSGD.train(data, iterations=100, step=0.00000001)

In [None]:
vnp = data.map(lambda lp: (lp.label, float(lr_model.predict(lp.features))))

In [None]:
vnp.take(5)

In [None]:
from pyspark.mllib.evaluation import RegressionMetrics

tm = RegressionMetrics(vnp)

print('RMSE = {0:5.1f}'.format(tm.rootMeanSquaredError))
print('MSE = {0:5.1f}'.format(tm.meanSquaredError))
print('MAE = {0:5.1f}'.format(tm.meanAbsoluteError))
print('r2 = {0:5.1f}'.format(tm.r2))
print('EV = {0:5.1f}'.format(tm.explainedVariance))

In [None]:
print(lr_model)

### Additional, more advanced problems:

**Activity 8:** Add more columns into the Linear Regression demonstrated in this Notebook. In particular, include departure time and distance into the calculation.

In [None]:
## Your code for activity 8 goes here
