#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png) 
# **Machine Learning with Apache Spark and Python**

This notebook will cover the major aspects of performing analytics on large datasets using the distributed computing platform Spark and the programming langauge Python. 

** Major topics covered in this tutorial include: **
* Basic Data Wrangling and Manipulation
* User-defined Functions
* Basic Plotting
* Distributing Analytics Tasks
* Best Practices
* Use Case -- Time-Series Indexing with iSax
* Use Case -- Sensory Anomaly Detection
* Use Case -- Genetic Algorithms for Optimization
* Performace Comparisons
* Additional Resources

Note that for reference, you can look up the details of the relevant methods in [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.sql).

**The Platform and Dataset**

This tutorial was constructed using a iPython/Jupyter notebook hosted on the Databricks platform. Databricks offers a free community edition which provides all the functionality to run this notebook with the included dataset. Otherwise, the notebook is constructed in a logical fashion with all output included, and can be replicated on any platform that supports PySpark or simply read through. 

The dataset used for this tutorial represents some basic parameters for two seperate flights, including flight number, time, phase, n1 (fan speed), and engine_position. The identifiers and parameters have been randomized but the data still represents somewhat typical measurements for these parameters on a flight. The dataset contains about half a million rows, with varying levels of completion in each of the parameters. While this dataset is not excessively large in nature, it is large enough to see performance increases from distribution and to illustrate the major concepts of Spark. 

Using the sqlContext and the SQL langauge, we can call our table into an DataFrame data structure and then preview the first five rows:

In [3]:
# sqlContext and SparkContext need to be defined on certain distributions, such as below on a HDFS-Hive system
# sc = SparkContext(conf=conf)
# sqlContext = HiveContext(sc)



dataDF = sqlContext.sql("SELECT * FROM flt_data_randomized ORDER BY frame_offset")
dataDF.show(5)

**The RDD Data Structure and Lazy Loading**

The base data structure in Spark is the RDD (Resiliant Distributed Dataset). While this data strcture contains all the elements of your dataset, those elements are distributed across various nodes in the cluster. It is possible to collect the data structure back in an ordered format via the collect() method, but this involves aggregating all the data onto the main node and is not recomended for very large datasets. Traditional ordered row operations are also harder to apply, but can be done via the map() function, which we will use later in this tutorial. While the RDD class contains many operations similar to traditional data frames from Python and R, there is a newer DataFrame data structure available to Spark that more closely mimics the traditional data frame strucutre. We will alternate between using both of these data structures for the remainder of this tutorial, as they're both suited better for different tasks.


Keep in mind also that many operations are simply 'remembered' in memory and not actually performed unless some output needs to be given. This is called 'lazy evaluation', from the Spark webpage: "All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently."

<h1>Data Wrangling</h1>



For the purposes of data splicing, the DataFrame is a better data structure to use, luckily our data was imported as a DataFrame when we used the SQL command above. To select particular columns from the data we can use the select() method, this results in a new DataFrame with only the listed columns. To collect and view our DataFrame we need the show() command, remember show() collects all the data onto the main node. Below we will select columns for the flight number, time measurement (frame_offset), and the fan speed measurements (n1). We then order the data according to the frame_offset, since it is a time measurement.

![Data Wrangling](http://alumni.berkeley.edu/sites/default/files/styles/960x400/public/wranglingbigdata.jpg?itok=k0fK1fJQ)

In [6]:
dataDF2 = dataDF.select('adi_flight_record_number', 'frame_offset', 'n1')
dataDF2.show(10)

Observing the output above, we see that there is a lot of missing n1 measurements, we'd like to exclude these from our analysis. For this we can use the filter() method, which will allow us to set a condition on which to filter our data. These filters can be chained. Since there are four observations with a 0 frame_offset, we will remove these as well.

In [8]:
dataDF2 = dataDF2.filter(dataDF2.n1 > 0)
dataDF2.show(100)

<h1>Column Creation and Transformation</h1>

Column operations in Spark have a straight-forward syntax, and we can either transform an existing column or create a new one from an existing column. This is all done via the withColumn() method. The first argument passed will be the name of the column to replace (if a name is given that does not correspond to an existing one, a new column with that name will be created). Assume we find out our time measurements actually begin 2 seconds after the flight begins, and thus we add a 2 second delay to our frame_offset column.

In [10]:
dataDF2 = dataDF2.withColumn('frame_offset', dataDF2.frame_offset + 2)
dataDF2.show(10)

We can also get basic column statistics by using the describe() method, calling this on a DataFrame with multiple columns will give you summary statistics on each of the columns. To make it interesting, we will filter out the first 30 seconds of the flight and see the average fan speed (n1). 


\* If the data was imported so that the 'n1' column is not a float type, then you will also need to run the commented code to get it in the proper format for the rest of the tutorial.

In [12]:
dataDF2 = dataDF2.select('adi_flight_record_number', 'frame_offset',dataDF2.n1.cast('float')) # Type Conversion Code


dataDF2.filter(dataDF2.frame_offset <= 30).describe().show() 

Since describe() can be display several outputs at once, we will compare the first 30 seconds of fan speed to the first 10 minutes into the flight. Since fan speed ramps up during the takeoff phase, we expect the second window of time to have a higher mean.

In [14]:
dataDF2.filter(dataDF2.frame_offset <= 30).select('n1').describe().show() # Code from above, first 30 seconds of flight

dataDF2.filter(dataDF2.frame_offset <= (10*60)).select('n1').describe().show() # First 10 minutes of flight

**GroupBy and Aggregate Functions**

![groupBy](https://silviomassari.files.wordpress.com/2011/07/mapreducetwo.png)

In Spark, groupBy() is one of the most powerful transformations. It allows you to perform aggregations on a DataFrame. Unlike other DataFrame transformations, groupBy() does not return a DataFrame. Instead, it returns a special GroupedData object that contains various aggregation functions. The most commonly used aggregation function is count(), but there are others (like sum(), max(), and avg(). These aggregation functions typically create a new column and return a new DataFrame.

Since we have two seperate flights in our data, let's group our observations by flight number and count how many observations are in each group using the aggregate function count(). We will be using these flight numbers as keys very soon. Since the result of an aggregate function is a DataFrame, we must use show() to display the results.

In [16]:
dataDF2.groupBy('adi_flight_record_number').count().show(10)

<h1>User-Defined Functions</h1>

Spark has the ability to define a function and use it on a column as well, by registering the function as something called a 'User-Defined Function' (UDF), the syntax for this is not very difficult, although your function needs to be registered as a UDF or applied via the [lambda syntax](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf). Here we will use the output of our 'describe()' function from a few cells above to normalize our data via a User-Defined Function. Syntax is below:

In [18]:
# Define udf
from pyspark.sql.functions import udf

def Znormalize(value):
    mean = 82.82697
    sd = 23.34478
    return ((value - mean)/sd)
 
udf_Znormalize = udf(Znormalize)

dataDF2.withColumn("n1", udf_Znormalize("n1")).show(10)

<h1> Row Operations </h1>

We've observed that we can do column operations using the withColumn() method, now let's understand how row operations are done in Spark. For a traditional DataFrame, row operations can be done via the map() method. The syntax for this is similar in flavor to the withColumn syntax, except no row specification is taken so some slicing must be done beforehand if only operating on a subset of rows. We will meet the sister function mapValues() later on in this guide.

<h1>Plotting</h1>

**Basic Example**

Now, since we know how to splice our data, manipulate it, and get basic descriptive statistics the next logical step is visual. We'd like to generate a readable and visually interesting depiction of patterns in our data. Since we have access to engine fan speed and time, our options are somewhat limited but we can still generate a pretty interesting plot. We will be using the Pandas and Matplotlib libraries, these should be automatically installed into the Databricks environment so we simply need to import them. We then clear the figure in case a previous one is stored, send a basic line graph to the canvas, add vertical lines to seperate flight phases, add labels and title, then finish off by displaying the canvas.

At the time of writing, there is not a way to plot in a distributed fashion; the data must be collected onto the main node, converted to a Pandas dataframe, and then plotted via the matplotlib library. This is not a huge concern, since only a small well-chosen subset of data should be used to plot so readability is not hindered.

Drawing a different kind of graph can be as simple as changing the 'kind' parameter in the plot() call, see the comprehensive (and very readable) PyPlot documentation at [http://matplotlib.org/users/pyplot_tutorial.html] 

\* Note: On certain clusters, a canvas may for display() may need to be created, this is beyond the scope of this tutorial

In [21]:
import matplotlib.pyplot as plt
import pandas as pd

plt.clf() # Clearing the canvas for a new plot

# Converting the Spark DataFrame to a Pandas Dataframe and making sure n1 is a float type
pdf = dataDF2.toPandas()
pdf.n1 = pdf.n1.astype(float)

# Constructing the plot
pdf.frame_offset = pdf.frame_offset.astype(float)
pdf.plot(x = 'frame_offset', y = 'n1', kind = 'line', color = 'blue', linewidth = 0.010)


# draw vertical line showing flight phases
plt.plot([2000, 2000], [0, 100], 'k-', lw=2, color='red') # [X2, X1], [Y1,Y2]
plt.plot([9000, 9000], [0, 100], 'k-', lw=2, color='red') # [X2, X1], [Y1,Y2]
plt.ylabel("Max N1 (fan speed)")
plt.xlabel("Frame Offset")
plt.title("Time vs. Fan Speed (N1) For a Single Flight")



display()




**Advanced Example**

Often, there are times where it is neccesary to integrate some basic analytic views into our plots. Since our plots are generated using the Matplotlib library, they are very compatible with Pandas objects of many types, and we can use numpy or scipy to operate on these Pandas objects. Below, we use a process called [Moving Average](https://en.wikipedia.org/wiki/Moving_average) to smooth the fan speed values for the first 50 seconds of the flight 'enrout' or cruise phase. We then calculate the standard deviation of these values. Finally, we make a plot with the original values (with transperency) overlaid with the moving average and a 'ribbon' for 2 standard deviations (since values >2 standard deviations may need to be examined for outliers). We make a function out of the entire process to make it re-usable.

In [23]:
from matplotlib import patches
import matplotlib.lines as mlines

plt.style.use('ggplot')
plt.clf()
#
dataDF = dataDF.filter(dataDF.flight_phase != 'null')
dataDF.select('flight_phase').distinct().show()
dataDF_Enrout = dataDF.filter(dataDF.flight_phase == 'G_ENROUT')
#
Enrout_Pandas = dataDF_Enrout.select('n1').toPandas()

def ribbonPlot(values, period = 20):
  price = pd.Series(values)
  ma = pd.rolling_mean(price, period)
  mstd = pd.rolling_std(price, period)
  plt.figure()
  plt.plot(price.index, price, 'k', alpha = 0.2)
  plt.plot(ma.index, ma, 'b')
  plt.fill_between(mstd.index, ma-2*mstd, ma+2*mstd, color='red', alpha=0.5)
  
 


  blue_line = mlines.Line2D([], [], color='blue', marker='',markersize=15, label='Moving Average (4)')
  black_line = mlines.Line2D([], [], color = 'black', markersize = 15, label = 'Values', alpha = 0.2)
  red_line = mlines.Line2D([],[], color = 'red', markersize = 15, label = '2 Std.Deviations', alpha = 0.5)
  plt.legend(handles=[black_line, blue_line, red_line])

  
  plt.ylabel('Fan Speed (n1)')
  plt.xlabel('Time into Phase (seconds)')
  plt.title('Time vs. Fan Speed (N1) For Enrout Phase')
    
  display()
  
ribbonPlot(Enrout_Pandas['n1'][1:50], 4)

<h1> Writing Distributed Functions </h1>

**Data Prep**

The main use-case for Spark often becomes the ability to run algorithms on large datasets in parallel. The computation time difference between a simple loop and parallel loop grows hugely as data sources become larger, so the understanding of how we can define and run functions that take advantage of distribution is paramount. The first thing to understand before we begain writing distributed functions is an understanding on how to seperate the data into chunks, each of which can run our algorithm seperately. The illustration below (courtesy of Mathworks and Matlab) sums up the process and comparison nicely.

![Distributed Computing](http://www.mathworks.com/cmsimages/63635_wl_91710v00_po_fig2_wl.gif)

**The Pair RDD**

This task of seperating data into computation chunks lends itself naturally to the idea of a key-value pair. Since each of the keys should contain data that is independent of the other keys, this means that splitting our computation between key-value pairs should not affect the structure of the computation. In our example, we have two seperate flight numbers contained within our data, each of which has it's own set of frame_offset and n1 values as well as time measurements. We will use these flight numbers as our keys, and the parameters frame_offset and n1 as our values. The best way to contain such an arrangement is the Pair RDD data structure, to create it we must split the data in a very precise manner.

The construction of pair RDD relies on data in the form of (key (value1, value2 ...)) where the key-value pair is contained in a tuple and the set of values for a key is contained in a tuple. The code below uses the map() command, which iterates through the rows of the DataFrame (including key and values), then uses a temporary lambda function to re-format each row into the proper format. 

The result is an object of type 'PipelinedRDD' (a special case of the RDD object discussed earlier), and thus we must use the collect() method (analogus to show() on DataFrames) to give a view of the result. Here we use the first column of our DataFrame (adi_flight_record_number) as our key, then the second (frame_offset/time) and third (n1/fan speed) columns as our values. It is not neccesary to convert the key to a string, but it may avoid some mistakes in the future.

\* REMEMBER, Python list splicing is non-inclusive to the endpoint. 1:3 only reports the index 1 and index 2 elements

In [26]:
dataDF3 = dataDF2.map(lambda x: (str(x[0]),list(x[1:3])))
dataDF3.collect()

Since our data is now in the proper form, we are ready to convert it into the Pair RDD data structure. We first use the groupByKey() function which simply groups the data above into groups by flight number (two groups, in our case). The result of the groupBy is a special data form that is very conductive to parallel computing, as a tuple of key-value pairs where the values are stored as a 'resultIterable' object. This resultIterable object is just what it seems, it's an object that Spark can iteratively apply a function through.

In [28]:
dataDF3 = dataDF3.groupByKey()
dataDF3.collect()


**Tips for Distributed Functions**

The key to writing good distributed functions is an understanding of what information is lost when we seperate the data by keys, and a strong comprehension of the contents of the resultIterable object seen above. For our case, not much information is lost at all while converting to key-value pairs, since each key corresponds to a full flight's worth of data. However, looking at fleet or customer-wide trends becomes significantly harder in this paradigm, and either clever keys need to be created or otherwise clever ways of joining the results of many computations (more on this later). The contents of our resultIterable object is a tuple of lists; that is to say each element of resultIterable is of the form [frame_offset, n1].

**Loops and Groups**

Now that we have an understanding of the nature of the data split and the contents of each split, let's start constructing our function. An easy way to guide your function design is with the idea of 'loops and groups', that is to say any function should contain a loop of some sort and only operate on one group of the data. Below is a simple function that loops through each resultIterable and gives us the sum of the n1 values, where the groups are flights (there are two in our example). Remember that each element is itself a list [frame_offset, n1] so we only want to loop through the second (1 in Python) index of each element. 

The function must be passed via the lambda notation, and is applied via the mapValues() method since we are only operating on the values and not the keys. Note that this function will be 'sent' to each group and run *AT THE SAME TIME*, with each group reporting its key and the output of its function application. Here lies the benefit of parallel computing, scaling this to a hundred or even a thousand flights can be done much more quickly and naturally then if we were to iterate through each flight in order. 

Below we construct a simple function that returns the sum of the values passed to it for each given key (all computed in parallel). Note the function only adds the second column (hence the 1 indexing) because the values are a list of the format [frame_offset, n1] and we only want to get the sum of the n1 values.

In [31]:
def getSum(iterableObj):
  ''' This function will return the 
       sum of the numeric elements of 
       the iterable object passed to it'''
  
  sum = 0
  for i in iterableObj:
    sum += i[1]
  return(sum)

dataDF3.mapValues(lambda x: getSum(x)).collect()

<h1> Best Practices </h1>

**The Importance of Error Handling**

*try-catch* : 
Since parallel functions often deal with giant volumes of data, it is often very difficult to get a feel for the types of data issues that may arise. Also since these functions process such large amounts of data an error can wreak havoc on functionality of the entire script. Thus, general exception handling and 'defensive' coding practices become very important. The simplest and most essential mechanism for handling this in Python is via the try,catch statements. These statements can handle [exceptions](http://www.tutorialspoint.com/python/python_exceptions.htm) that occur inside the 'try' block and perform an operation specified in the 'catch' block. In the frequantly occuring case of TypeErrors, we can simply print the element that is not of proper type and then skip it in the operation, as follows:

In [33]:
def getSum(iterableObj):
  ''' This function will return the 
       sum of the numeric elements of 
       the iterable object passed to it'''
  
  sum = 0
  for i in iterableObj:
    try:
      sum += i
    except TypeError as t:
      print(str(i) + " is a " + str(type(i)) + ", not a numeric type")
      pass
  return(sum)

badList = [1,4,5, 'notANumber', 4]

getSum(badList)

**Error-handling via If-Else**:

There is often confusion between using If-Else or Try-Catch to handle an exception that may occur, and often the two can become interchangable. See the code below, its output will be the same as the one above however it is implemented using an if-else to handle incorrect types. Which one should be used? In this case, the If-Else is a better option, simply because TypeErrors may be commonplace with large amounts of data (so not really an exceptional case) and throwing exceptions can be somewhat more computationally intensive then simply avoiding the case with If-Else. You will notice that we had to check over multiple numeric types in our If condition, this is another consideration when choosing your exception handling mechanisms

In [35]:
def getSum2(iterableObj):
  ''' This function will return the 
       sum of the numeric elements of 
       the iterable object passed to it'''
  
  sum = 0
  for i in iterableObj:
    if(isinstance(i, (int, long, float, complex))):
      sum += i
    else:
      print(str(i) + " is a " + str(type(i)) + ", not a numeric type")
      pass
  return(sum)

badList = [1,4,5, 'notANumber', 4]

getSum2(badList)

**Avoiding Data Shuffle**

Using reduceByKey() vs groupByKey(): There is a large discussion in the Spark community about replacing groupByKey() with other methods such as reduceByKey() or combineByKey(), this is mainly because groupByKey() causes a lot of data to be 'shuffled' around the network in the grouping process, and when there are additonal operations being performed this can lead to ineffeciancy or even memory-overload. Explaining the computational differences between the two is beyond the scope of this tutorial, but it is better to use reduceByKey() when the operation can be formatted in a reduction sense (many operations cannot be formatted in this way or become excessively complicated). See this [post](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) for a full explanation

Not doing collect() or show(): For large datasets the collect() or show() command should not be used prior to seperating a managable chunk of data out, these commands gather all the data they're called upon to the main node, and may cause memory issues.

<h1>**Use Case: iSax Indexing for Time Series**</h1>

The type of data we encounter in this tutorial is an instance of something called a time-series, or a series of (x,y) values where the x values are time measurements. The y values are measurements of fan speed (n1) in our case, but they can be any value we wish to observe and draw conclusions about. A lot of traditional parametric statistical techniques do not work well on time-series data, since time-based data has varying distributions and is often non-normal. The usage of time-series specific algorithms and models can become very computationally complex and are also preceeded by methods checking the nature of the data and its suceptibility to certain kinds of models. Thus, taking the problem of comparing two time-series out of the time-series domain and into a more computationally simple paradigm can be of great benefit. This paper introduces the concept of converting a time series into a sequence of letters, with each time series constituting a 'word' as a whole. Comparing two time-series becomes a matter of comparing their words, a much simpler text analysis and matching problem.
![iSax](http://images.slideplayer.com/21/6323537/slides/slide_61.jpg)

** Writing the iSax function for an individual Time-Series**

The function defined below will take two parameters, timeList (time parameter) and valueList (value parameter), and returns the iSax 'word' for the corresponding time series. There are many aspects of the algorithm below, the general method is outlined in this [research paper](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.439.3743&rep=rep1&type=pdf)

In [39]:
def iSax(timeList, valueList, time_bucket_length = 14, alphabet_length = 4, max_vector = 110, min_vector = 0):
  
  '''
  This function implements the iSax indexing for a single variable time-series
  
  Input 1: timeList -- An ordered list of time observations from the time series
  Input 2: valueList -- An ordered list of value observations from the time series
  Input 3: time_bucket_length -- The time interval that each bucket/letter should be constructed of
  Input 4: alphabet_length -- The length of the overall word
  Input 5: max_vector -- Max value
  Input 6: min_vector -- Min value
  
  Returns: word -- The iSax 'word' corresponding to the input time series
  
  '''
  
  ## Parameter Definitions
  time_bucket_length = 14 # Length of each time bucket
  alphabet_length = 4 # Length of overall output
  max_vector = 110 
  min_vector = 0
  
  length_of_ts = len(timeList)
  index_pairs = []
  
  current_time = time_bucket_length
  temp_start = 0
  temp_end = 0
  
  endTime = 0
  for j in timeList:
    if j>endTime:
      endTime = j
  
  
  while current_time < (endTime + time_bucket_length): # While we haven't reached the end of our time series
    bucket = []
    index = 0
    for item in timeList:
      if (item <= current_time) and (item > (current_time - time_bucket_length)): # For time values in our current interval
        bucket.append(index) # Get the indexes of all time values in our current interval
      index += 1
    if len(bucket) == 0:
      # empty bucket - insert null pair
      index_pairs.append('blank')
    elif len(bucket) == 1:
      # only 1 value in bucket - time slice 1 value
      index_pairs.append((bucket[0], bucket[0] + 1))
    else:
      index_pairs.append((bucket[0], bucket[-1] + 1))
    current_time += time_bucket_length
  

  word = []
  for item in index_pairs:
    
  
    
    if item == 'blank':
      mean = min_vector   
      
    elif (item[1]-item[0])==1: # If there's only one value in range, make mean the one value
      mean = valueList[item[0]]
      
    else: # For multiple values, actually take mean of values
      temp_list = valueList[item[0]:item[1]]
      count = 0
      total = 0
      for value in temp_list:
        if value is None: # Data from flight could cause value to be None, causes type mismatch error at total+=value
          pass
        else:
          count += 1
          total += value
      mean = total/count
      
      
      
    vector_bucket_len = float(max_vector - min_vector) / float(alphabet_length)
    index = 0
    check_value = float(min_vector)+ float(vector_bucket_len)
      
      
    for i in range(alphabet_length):
      if mean < check_value:
        word.append(chr(65+index))
        break
      check_value += vector_bucket_len
      index += 1
      
  word = ''.join(word)
  
  return(word)


**Applying iSax via loop list creation**

Since the inputs to the iSax function need to be lists, it is more conveniant for us to loop through the rows of our dataset (now a resultIterable and seperated by flight number) and collect the columns back as lists. These columns can then be passed to iSax and the result returned, all in a distributed fashion via the mapValues() function. Notice how the output has a long string of B's, with more variation towards the tails. These is natural as it correspons to flight phases such as accelration/takeoff, cruise (longest phase), and descent/landing.

In [41]:
# i[0] is frame offset
# i[1] is Engine 1


from operator import *

def getiSax(iterableObj):
  
  '''
  This function takes an resultIterable object (such as the values from a pair RDD) and constructs lists of the values. Then,
  as defined in the function block below, it operates on those lists. When a function is passed through the Lambda function to mapValues, it distributes the function
  to each of the keys in the RDD. 
  
  The commented our portions are to increase functionality in seperating results be engine, in which case we would need to include the engine1 and engine2 columns in place
  of n1 in the Pair RDD creation above
  '''
  ### Placeholder Lists
  frame_offset_list = []
  engine1_n1 = []
  #engine2_n1 = []
  
  ### Main Loop
  for i in iterableObj:
    (frame_offset_list.append(i[0]))
    (engine1_n1.append(i[1]))
    #(engine2_n1.append(i[2]))
 
  
  ### Functional Block
  iSax_Strings = iSax(frame_offset_list, engine1_n1)
  ### 
  
  return(iSax_Strings)

  
      
  

dataDF3.mapValues(lambda x: getiSax(x)).collect()

**Comparison and Matching using iSax**

We can use the iSax-indexed flights to isolate certain concerning patterns or to compare a parameter (n1 in this case) between two flights. We can use the [Levenshtein Distance Metric](https://en.wikipedia.org/wiki/Levenshtein_distance) for strings to compare the two flights and get a 'difference' score. In the future we can build a distribution for these difference scores and examine the flights on the tail ends of this distribution.
Examining flights where part failure or otherwise erroneous performance occured, we can see the string patterns that preceeded the failure and try to detect these issues in current flights. Suppose the pattern "BAA" was detected for a N1 parameter on a flight right before there was a fan blade fracture. We can attempt to find this pattern in our current flight and perform maintainence before a further failure occurs. 

The first output below is the Levenshtein Distance between the two flights, and the second output is the index in flight1 where the pattern "BAA" appears.

In [43]:
def LevDistance(str1, str2):
  edits = 0
  for i, j in zip(list(str1), list(str2)):
    if i!= j:
      edits += 1
  return(edits)

indexedSeries = dataDF3.mapValues(lambda x: getiSax(x)).collect()
flight1_word = indexedSeries[0][1]
flight2_word = indexedSeries[1][1]

print("Levenshtein Distance = " + str(LevDistance(flight1_word, flight2_word)))

pattern = "BAA"
foundIndex = flight1_word.find(pattern)

if foundIndex > 0:
  print("Pattern " + pattern + " found at index " + str(foundIndex))
else:
  print("Pattern not found")


<h1> Use Case - Sensory Anomaly Detection </h1>

Most aircraft will have more then one engine aboard, and since each engine is equipped with its own set of sensors we would like to know if there is an error in either of these sensors. We can do this by comparing the mean values and variance returned from each sensor. This would also indicate a possible issue with one of the engines if the values arn't very similar to one another. We utilize the Mllib package from PySpark to take these basic column statistics, then compare them.

In [45]:
import numpy as np
from pyspark.mllib.stat import *
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors, Vector

def column_stats(dataframe, column):
  vectorAssembler = VectorAssembler(inputCols=[column],outputCol="features")
  expr = [col(c).cast("Double").alias(c) for c in vectorAssembler.getInputCols()]
  df2 = dataframe.select(*expr)
  df = vectorAssembler.transform(df2)
  df2 = df2.filter(df2.n1 > 0)
  rdd = df2.map(lambda data: Vectors.dense([c for c in data]))
  summary = Statistics.colStats(rdd)
  return((summary.mean()[0], summary.variance()[0]))

def anomalyDetect(dataframe, flight_num):
  dataDF = dataframe.filter(dataframe.adi_flight_record_number == flight_num)
  Engine1_data = dataDF.filter(dataDF.engine_position == 1)
  Engine2_data = dataDF.filter(dataDF.engine_position == 2)

  Engine1_stats = column_stats(Engine1_data, 'n1')
  Engine2_stats = column_stats(Engine2_data, 'n1')
  
  print("Flight number " + str(flight_num))
  print("----------------------------------")
  print("Difference in Engine1 and Engine2 n1 Averages: ")
  print(Engine1_stats[0] - Engine2_stats[0])
  print('')
  print("With a variance ratio of: ")
  print(Engine1_stats[1]/Engine2_stats[1])
  print("----------------------------------")
  
  if(Engine1_stats[0] - Engine2_stats[0] > 1 or Engine1_stats[1]/Engine2_stats[1] > 2 or Engine1_stats[1]/Engine2_stats[1] < 1/2): # These cutoffs need to be refined via discussion with engineers
    print("Sensor Anomaly Detected")
  else:
    print("No Sensor Anomaly Detected")
    
    
anomalyDetect(dataDF, 9861128)
  

<h1> Use Case - Genetic Algorithms for Optimization </h1>

A Genetic Algorithm is an algorithm that solves an optimization process by mimicking the idea of evolution. That is, in each generation (iteration) of our algorithm, we generate canidate solutions to our optimization problem and combine these (various a random combination operation) to generate the 'offspring'. Now the best of these offspring solutions is again combined and randomly mutated to produce offspring and so on. Eventually, when we reach a certain level of fitness (closeness to optimal solution), we terminate and return the current offspring. Below is a sample genetic algorithm for producing a partition (set of numbers that sum upto a target) of an integer X, credit goes to Will Larson at lethain.com. We can see this is equivelent to optimizing the different between the sum of a n-element list and the target integer X. After the following code, we will evaluate how such optimization can be used to refine our time-series models when run with the iSax indexed date.

In [47]:
"""
# Example usage
from genetic import *
target = 371
p_count = 100
i_length = 6
i_min = 0
i_max = 100
p = population(p_count, i_length, i_min, i_max)
fitness_history = [grade(p, target),]
for i in xrange(100):
    p = evolve(p, target)
    fitness_history.append(grade(p, target))

for datum in fitness_history:
   print datum
"""
from random import randint, random
from operator import add

def individual(length, min, max):
    'Create a member of the population.'
    return [ randint(min,max) for x in xrange(length) ]

def population(count, length, min, max):
    """
    Create a number of individuals (i.e. a population).

    count: the number of individuals in the population
    length: the number of values per individual
    min: the minimum possible value in an individual's list of values
    max: the maximum possible value in an individual's list of values

    """
    return [ individual(length, min, max) for x in xrange(count) ]

def fitness(individual, target):
    """
    Determine the fitness of an individual. Higher is better.

    individual: the individual to evaluate
    target: the target number individuals are aiming for
    """
    sum = reduce(add, individual, 0)
    return abs(target-sum)

def grade(pop, target):
    'Find average fitness for a population.'
    summed = reduce(add, (fitness(x, target) for x in pop))
    return summed / (len(pop) * 1.0)

def evolve(pop, target, retain=0.2, random_select=0.05, mutate=0.01):
    graded = [ (fitness(x, target), x) for x in pop]
    graded = [ x[1] for x in sorted(graded)]
    retain_length = int(len(graded)*retain)
    parents = graded[:retain_length]
    # randomly add other individuals to
    # promote genetic diversity
    for individual in graded[retain_length:]:
        if random_select > random():
            parents.append(individual)
    # mutate some individuals
    for individual in parents:
        if mutate > random():
            pos_to_mutate = randint(0, len(individual)-1)
            # this mutation is not ideal, because it
            # restricts the range of possible values,
            # but the function is unaware of the min/max
            # values used to create the individuals,
            individual[pos_to_mutate] = randint(
                min(individual), max(individual))
    # crossover parents to create children
    parents_length = len(parents)
    desired_length = len(pop) - parents_length
    children = []
    while len(children) < desired_length:
        male = randint(0, parents_length-1)
        female = randint(0, parents_length-1)
        if male != female:
            male = parents[male]
            female = parents[female]
            half = len(male) / 2
            child = male[:half] + female[half:]
            children.append(child)
    parents.extend(children)
    return parents

<h2> Additional Resources </h2>

<li>
PySpark Master Documentation
https://people.eecs.berkeley.edu/~jegonzal/pyspark/
<br>

 
<li>
Getting Started with Spark in Python (webpage)
https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python
<br>
  
<li>
PySpark Wordcount example (webpage)
http://www.clouddatalab.com/spark/pyspark/wordcount.html
<br>
 
<li>
PySpark Github Repository (GH)
https://github.com/apache/spark/tree/master/python/pyspark
<br>

<li>
Cloudera Spark Guide (webpage)
https://www.cloudera.com/documentation/enterprise/5-7-x/topics/spark.html
<br>

<li>
Getting the Best Performance from PySpark (slides)
http://www.slideshare.net/SparkSummit/getting-the-best-performance-with-pyspark
<br>

<li>
Genetic Algorithms: Cool Name and Damn Simple 
http://lethain.com/genetic-algorithms-cool-name-damn-simple/
<br>

<li>
Tips for Writing Better Spark Programs (slides)
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs
<br>