In [None]:
#  Download the required Dataset for analysis
# ! wget http://stat-computing.org/dataexpo/2009/2007.csv.bz2
# ! http://stat-computing.org/dataexpo/2009/2008.csv.bz2
# ! wget https://github.com/jayyanar/MachineLearning_Workbook/blob/master/2007-ord-weather-data.csv --no-check-certificate
# ! wget https://github.com/jayyanar/MachineLearning_Workbook/blob/master/2008-ord-weather-data.csv --no-check-certificate
# ! bzip2 -d 2007.csv.bz2
# ! bzip2 -d 2008.csv.bz2

In [None]:
! ls -lrt


In [1]:
# For SQL-type queries (Spark)
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *

# For regression and other possible ML tools (Spark)
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics


# Important for managing features  (Spark)
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler

# For displaying and other related IPython tools...
from IPython.display import display
from IPython.html.widgets import interact

# Typycal Python tools
import sys
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib.pyplot as plt
import os.path



In [None]:
# To show plots inline
get_ipython().magic(u'matplotlib inline')

# function to read HDFS file into dataframe using PyDoop
import pydoop.hdfs as hdfs
def read_csv_from_hdfs(path, cols, col_types=None):
  files = hdfs.ls(path);
  pieces = []
  for f in files:
    fhandle = hdfs.open(f)
    pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))
    fhandle.close()
  return pd.concat(pieces, ignore_index=True)

# read 2007 year file
cols = ['year', 'month', 'day', 'dow', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'Carrier', 'FlightNum', 
        'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 
        'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 
        'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay'];
flt_2007 = read_csv_from_hdfs('airline/delay/2007.csv', cols)

flt_2007.shape

In [None]:
textFile = sc.textFile('2007.csv')

In [None]:
textFileRDD = textFile.map(lambda x: x.split(','))
header = textFileRDD.first()
textRDD = textFileRDD.filter(lambda r: r != header)

In [None]:
num_records = textFileRDD.count()
print ('Number of records ' , num_records)

In [None]:
aux_ = textFileRDD.take(2)
feature_names = aux_[0]
feature_example = aux_[1]

In [None]:
print ('Feature Names ' , feature_names)

In [None]:
print ('Feature Example ' , feature_example)

In [None]:
print ("Number of features = " , len(feature_example))

In [None]:
print (type(feature_example))

In [13]:
# ### Creating a SQL Dataframe from RDD
# 
# We now create a SQL DataFrame, this entity is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood. We will utilize the recently created Spark RDD and use the Spark SQL context to create the desired data frame,

# We first create function that will allow to parse a record of our RDD into the desired format. As a reference we take a look at features_names and feature_example we just created above

def parse(r):
    try:
        x=Row(Month=int(r[1]),\
          DayofMonth=int(r[2]),\
          DayOfWeek=int(r[3]),\
          DepTime=int(float(r[4])), \
          CRSDepTime=int(r[5]),\
          DepDelay=int(float(r[15])),\
          Origin=r[16],\
          Dest=r[17], \
          Distance=int(float(r[18]))) 
    except:
        x=None  
    return x

# define hour function to obtain hour of day
def hour_ex(x): 
    h = int(str(int(x)).zfill(4)[:2])
    return h
# register as a UDF 
f = udf(hour_ex, IntegerType())


In [28]:
def prepFlightDelay(infile, orgn):
    textFile = sc.textFile(infile)
    textFileRDD = textFile.map(lambda x: x.split(','))
    header = textFileRDD.first()
    textRDD = textFileRDD.filter(lambda r: r != header)
    rowRDD = textRDD.map(lambda r: parse(r)).filter(lambda r:r != None)
    airline_df = sqlContext.createDataFrame(rowRDD)
    airline_df = airline_df.withColumn('hour', f(airline_df.CRSDepTime))
    airline_df.registerTempTable("airlineDF")
    airline_df_ORD = airline_df.filter((col("Origin") == orgn))
    airline_df_ORD_15 = airline_df_ORD.withColumn('DepDelayed', airline_df_ORD['DepDelay']>15)
    return airline_df_ORD_15

In [29]:
flight_2007_ORD = prepFlightDelay('2007.csv',"ORD")

In [30]:
flight_2007_ORD.take(5)

[Row(CRSDepTime=1100, DayOfWeek=4, DayofMonth=25, DepDelay=-8, DepTime=1052, Dest='EWR', Distance=719, Month=1, Origin='ORD', Year=2007, hour=11, DepDelayed=False),
 Row(CRSDepTime=1500, DayOfWeek=7, DayofMonth=28, DepDelay=41, DepTime=1541, Dest='IAH', Distance=925, Month=1, Origin='ORD', Year=2007, hour=15, DepDelayed=True),
 Row(CRSDepTime=2000, DayOfWeek=1, DayofMonth=29, DepDelay=45, DepTime=2045, Dest='CLE', Distance=316, Month=1, Origin='ORD', Year=2007, hour=20, DepDelayed=True),
 Row(CRSDepTime=1900, DayOfWeek=3, DayofMonth=17, DepDelay=-9, DepTime=1851, Dest='EWR', Distance=719, Month=1, Origin='ORD', Year=2007, hour=19, DepDelayed=False),
 Row(CRSDepTime=1745, DayOfWeek=5, DayofMonth=12, DepDelay=180, DepTime=2045, Dest='CLE', Distance=316, Month=1, Origin='ORD', Year=2007, hour=17, DepDelayed=True)]

In [21]:
#
# Python UDFs for our PIG script
#
from datetime import date

# this array defines the dates of holiday in 2007 and 2008
holidays = [
        date(2007, 1, 1), date(2007, 1, 15), date(2007, 2, 19), date(2007, 5, 28), date(2007, 6, 7), date(2007, 7, 4), \
        date(2007, 9, 3), date(2007, 10, 8), date(2007, 11, 11), date(2007, 11, 22), date(2007, 12, 25), \
        date(2008, 1, 1), date(2008, 1, 21), date(2008, 2, 18), date(2008, 5, 22), date(2008, 5, 26), date(2008, 7, 4), \
        date(2008, 9, 1), date(2008, 10, 13), date(2008, 11, 11), date(2008, 11, 27), date(2008, 12, 25) \
     ]
# get number of days from nearest holiday
def days_from_nearest_holiday(year, month, day):
    d = date(year, month, day)
    x = [(abs(d-h)).days for h in holidays]
    return min(x)
def to_date(year, month, day):
    td = date(year, month, day)
    return td

In [None]:
# We add a new column to our data frame, **DepDelayed**, a binary variable:
# - **True**, for flights that have > 15 minutes of delay
# - **False**, for flights that have <= 15 minutes of delay
# 
# We will later use **Depdelayed** as the target/label column in the classification process.


#airline_df = airline_df.withColumn('DepDelayed', airline_df['DepDelay']>15)
#airline_df = airline_df.withColumn('DepDelayed',airline_df['Origin']=="ORD")
#airline_df_ORD = airline_df.withColumn('DepDelayed', airline_df.filter((col("Origin") == "ORD") | (col("DepDelay")>15)))
airline_df_ORD = airline_df.filter((col("Origin") == "ORD"))
airline_df_ORD_15 = airline_df_ORD.withColumn('DepDelayed', airline_df_ORD['DepDelay']>15)

In [None]:
airline_df_ORD_15.take(10)

In [None]:
print ("Total flights Origin at ORD Chicago Airport: " + str(airline_df_ORD_15.count()))
print (airline_df_ORD_15.groupby('DepDelayed').count().show())

In [None]:
#airline_df_ORD_15.take(10)
airline_df_ORD_15.filter(airline_df_ORD_15.DepDelayed == True).groupby(airline_df_ORD_15.Month).count().sort(asc("Month")).show()

In [None]:
df_pandas = airline_df_ORD_15.toPandas()

In [None]:
df_pandas.head()

In [None]:
# Compute average number of delayed flights by hour
df_pandas['hour'] = df_pandas['CRSDepTime'].map(lambda x: int(str(int(x)).zfill(4)[:2]))
grouped = df_pandas[['DepDelayed', 'hour']].groupby('hour').mean()

# plot average delays by hour of day
grouped.plot(kind='bar')

In [None]:
# Compute average number of delayed flights per carrier
grouped1 = df_pandas[['DepDelayed', 'UniqueCarrier']].groupby('UniqueCarrier').filter(lambda x: len(x)>10)
grouped2 = grouped1.groupby('UniqueCarrier').mean()
carrier = grouped2.sort_values(['DepDelayed'], ascending=False)

# display top 20 destination carriers by delay (from ORD)
carrier[:20].plot(kind='bar')

In [None]:
df_pandas.info()

In [None]:
! wget http://www.java2s.com/Code/JarDownload/joda/joda-time-2.0.jar.zip

In [None]:
! mkdir joda

In [None]:
! mv joda-time-2.0.jar.zip joda

In [None]:
! ls -rtl 

In [None]:
! cd joda

In [None]:
! ls -rtla

In [None]:
! unzip  joda/joda-time-2.0.jar.zip

In [None]:
%configure

In [None]:
import org.apache.spark.rdd._
import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader

import java.io._
import org.joda.time._
import org.joda.time.format._
import org.joda.time.format.DateTimeFormat
import org.joda.time.DateTime
import org.joda.time.Days


case class DelayRec(year: String,
                    month: String,
                    dayOfMonth: String,
                    dayOfWeek: String,
                    crsDepTime: String,
                    depDelay: String,
                    origin: String,
                    distance: String,
                    cancelled: String) {

    val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007",
      "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007",
      "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008",
      "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")

    def gen_features: (String, Array[Double]) = {
      val values = Array(
        depDelay.toDouble,
        month.toDouble,
        dayOfMonth.toDouble,
        dayOfWeek.toDouble,
        get_hour(crsDepTime).toDouble,
        distance.toDouble,
        days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt)
      )
      new Tuple2(to_date(year.toInt, month.toInt, dayOfMonth.toInt), values)
    }

    def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2)
    def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)

    def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = {
      val sampleDate = new DateTime(year, month, day, 0, 0)

      holidays.foldLeft(3000) { (r, c) =>
        val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
        val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
        math.min(r, distance)
      }
    }
  }

// function to do a preprocessing step for a given filedd
def prepFlightDelays(infile: String): RDD[DelayRec] = {
    val data = sc.textFile(infile)

    data.map { line =>
      val reader = new CSVReader(new StringReader(line))
      reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))
    }.map(list => list(0))
    .filter(rec => rec.year != "Year")
    .filter(rec => rec.cancelled == "0")
    .filter(rec => rec.origin == "ORD")
}

In [None]:
def parse(r):
    try:
        x=Row(Year=int(r[0]),\
          Month=int(r[1]),\
          DayofMonth=int(r[2]),\
          DayOfWeek=int(r[3]),\
          DepTime=int(float(r[4])), \
          CRSDepTime=int(r[5]),\
          ArrTime=int(float(r[6])),\
          CRSArrTime=int(r[7]), \
          UniqueCarrier=r[8],\
          DepDelay=int(float(r[15])),\
          Origin=r[16],\
          Dest=r[17], \
          Distance=int(float(r[18])),\
          CarrierDelay=int(float(r[24])),\
          WeatherDelay=int(float(r[25])),\
          NASDelay= int(float(r[26])),\
          SecurityDelay=int(float(r[27])),\
          LateAircraftDelay=int(float(r[28])))  
    except:
        x=None  
    return x

rowRDD = textRDD.map(lambda r: parse(r)).filter(lambda r:r != None)
airline_df = sqlContext.createDataFrame(rowRDD)