In [1]:
# 1. magic to print version
# 2. magic so that the notebook will reload external python modules
%load_ext watermark
%load_ext autoreload
%autoreload 2

%watermark -a 'Ethen' -d -t -v -p pyspark

Ethen 2018-03-21 19:08:19 

CPython 3.6.4
IPython 6.2.1

pyspark 2.2.1


In [2]:
# https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/
# brew install mongodb
# mkdir -p /Users/mingyuliu/db
# mongod --dbpath /Users/mingyuliu/db

import pymongo
from subprocess import call, Popen

# launch mongodb specifying a custom path to the db
Popen('mongod --dbpath /Users/mingyuliu/db', shell = True)

# create a new database
# and a collection in the database
conn = pymongo.MongoClient()
db = conn['agile_data_science']
collection = db['on_time_performance']
collection

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'agile_data_science'), 'on_time_performance')

In [3]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# https://docs.mongodb.com/spark-connector/current/
# https://stackoverflow.com/questions/48145215/unable-to-connect-to-mongo-from-pyspark
# https://spark-packages.org/package/mongodb/mongo-spark
# https://stackoverflow.com/questions/46814260/spark-mongodb-connector-scala-missing-database-name
conf = SparkConf()
conf.setAppName('myApp')
conf.set('spark.jars.packages',
         'org.mongodb.spark:mongo-spark-connector_2.10:2.2.1,org.elasticsearch:elasticsearch-hadoop:6.2.1')
conf.set('spark.mongodb.input.uri', 'mongodb://127.0.0.1/agile_data_science.on_time_performance')
conf.set('spark.mongodb.input.readPreference.name', 'secondaryPreferred')
conf.set('spark.mongodb.output.uri', 'mongodb://127.0.0.1/agile_data_science.on_time_performance')

spark = (SparkSession.
    builder.
    config(conf = conf).
    getOrCreate())

In [4]:
# use test to determine whether the hadoop directory exists or not
# https://hadoop.apache.org/docs/r2.8.2/hadoop-project-dist/hadoop-common/FileSystemShell.html#test
filepath = 'On_Time_On_Time_Performance_2015.csv.bz2'
call('hdfs dfs -copyFromLocal /Users/mingyuliu/Agile_Data_Code_2/data/{} .'.format(filepath), shell = True)

df_on_time = (spark.read.
    format('csv').
    options(header = 'true', treatEmptyValuesAsNulls = 'true').
    load(filepath))
df_on_time.registerTempTable("on_time_performance")

In [5]:
# data dictionary
# https://www.transtats.bts.gov/Fields.asp?Table_ID=236

# when parsing types, be aware that mongodb does not recognize float type
trimmed_cast_performance = spark.sql("""
    SELECT
      Year, Quarter, Month, DayofMonth, DayOfWeek, FlightDate,
      Carrier, TailNum, FlightNum,
      Origin, OriginCityName, OriginState,
      Dest, DestCityName, DestState,
      DepTime, cast(DepDelay as double), cast(DepDelayMinutes as int),
      cast(TaxiOut as double), cast(TaxiIn as double),
      WheelsOff, WheelsOn,
      ArrTime, cast(ArrDelay as double), cast(ArrDelayMinutes as double),
      cast(Cancelled as int), cast(Diverted as int),
      cast(ActualElapsedTime as double), cast(AirTime as double),
      cast(Flights as int), cast(Distance as double),
      cast(CarrierDelay as double), cast(WeatherDelay as double), 
      cast(NASDelay as double),
      cast(SecurityDelay as double), 
      cast(LateAircraftDelay as double),
      CRSDepTime, CRSArrTime
    FROM
      on_time_performance
""")

# Replace on_time_performance table# with our new, trimmed table and show its contents
trimmed_cast_performance.registerTempTable("on_time_performance")
trimmed_cast_performance.limit(6).toPandas()

+----+-------+-----+----------+---------+----------+-------+-------+---------+------+--------------------+-----------+----+--------------------+---------+-------+--------+---------------+-------+------+---------+--------+-------+--------+---------------+---------+--------+-----------------+-------+-------+--------+------------+------------+--------+-------------+-----------------+----------+----------+
|Year|Quarter|Month|DayofMonth|DayOfWeek|FlightDate|Carrier|TailNum|FlightNum|Origin|      OriginCityName|OriginState|Dest|        DestCityName|DestState|DepTime|DepDelay|DepDelayMinutes|TaxiOut|TaxiIn|WheelsOff|WheelsOn|ArrTime|ArrDelay|ArrDelayMinutes|Cancelled|Diverted|ActualElapsedTime|AirTime|Flights|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|CRSDepTime|CRSArrTime|
+----+-------+-----+----------+---------+----------+-------+-------+---------+------+--------------------+-----------+----+--------------------+---------+-------+--------+---------------+-

In [6]:
trimmed_cast_performance.write.parquet('on_time_performance.parquet')

In [7]:
df_on_time = spark.read.parquet('on_time_performance.parquet')

# save to mongodb directly from spark
# there's also mode 'append'
(df_on_time.write.
 format('com.mongodb.spark.sql.DefaultSource').
 option('spark.mongodb.output.uri', 'mongodb://127.0.0.1/agile_data_science.on_time_performance').
 mode('overwrite').save())

In [8]:
# sample query
temp = collection.find_one({'FlightDate': '2015-01-01'})
temp

{'ActualElapsedTime': 82.0,
 'AirTime': 59.0,
 'ArrDelay': -6.0,
 'ArrDelayMinutes': 0.0,
 'ArrTime': '1504',
 'CRSArrTime': '1510',
 'CRSDepTime': '1345',
 'Cancelled': 0,
 'Carrier': 'AA',
 'DayOfWeek': '4',
 'DayofMonth': '1',
 'DepDelay': -3.0,
 'DepDelayMinutes': 0,
 'DepTime': '1342',
 'Dest': 'MEM',
 'DestCityName': 'Memphis, TN',
 'DestState': 'TN',
 'Distance': 432.0,
 'Diverted': 0,
 'FlightDate': '2015-01-01',
 'FlightNum': '1519',
 'Flights': 1,
 'Month': '1',
 'Origin': 'DFW',
 'OriginCityName': 'Dallas/Fort Worth, TX',
 'OriginState': 'TX',
 'Quarter': '1',
 'TailNum': 'N001AA',
 'TaxiIn': 7.0,
 'TaxiOut': 16.0,
 'WheelsOff': '1358',
 'WheelsOn': '1457',
 'Year': '2015',
 '_id': ObjectId('5ab31059050f547b4a90ed22')}

In [9]:
# index for faster retrieval
collection.create_index([
    ('Origin', pymongo.ASCENDING),
    ('Dest', pymongo.ASCENDING),
    ('FlightDate', pymongo.ASCENDING)
], background = True)

'Origin_1_Dest_1_FlightDate_1'

Similar way of doing it through command line

```bash

# launch mongodb repl
mongo agile_data_science

# these queries are all in mongodb repl
db.on_time_performance.findOne()

db.on_time_performance.findOne(
  {Carrier: 'DL', FlightDate: '2015-01-01', FlightNum: '478'})

# https://docs.mongodb.com/manual/reference/method/db.collection.createIndex/#ensureindex-options
# the 1 specifies ascending order, -1 specifies descending order, this is important
# when performing sorting
db.on_time_performance.createIndex({Carrier: 1, FlightDate: 1, FlightNum: 1})
```

In [10]:
import time

# https://www.elastic.co/guide/en/elasticsearch/reference/current/_installation.html
# elasticsearch running at http://localhost:9200
# kibana running at http://localhost:5601
Popen('/Users/mingyuliu/elasticsearch-6.2.2/bin/elasticsearch', shell = True)

# wait for a while for elasticsearch to launch before attempting to connect
time.sleep(5)
Popen('/Users/mingyuliu/kibana-6.2.2-darwin-x86_64/bin/kibana', shell = True)

<subprocess.Popen at 0x103e4cb00>

In [11]:
call('bash elastic_create.sh', shell = True)

7

In [13]:
# save the DataFrame to Elasticsearch
time.sleep(5)
(df_on_time.write.
 format('org.elasticsearch.spark.sql').
 option('es.resource', 'agile_data_science/on_time_performance').
 option('es.batch.size.entries', '100').
 mode('overwrite').
 save())

In [14]:
# introducing the query language
# https://www.elastic.co/guide/en/elasticsearch/reference/current/_introducing_the_query_language.html
# https://elasticsearch-py.readthedocs.io/en/master/
from elasticsearch import Elasticsearch


elastic_url = "agile_data_science"
elastic = Elasticsearch()

dest = 'SFO'
origin = 'ORD'
flight_date = '2015-07-31'
must_query = [
    {'match': {'Origin': origin}},
    {'match': {'Dest': dest}},
    {'match': {'FlightDate': flight_date}}]
query = {
    'query': {
        'bool': {
            'must': must_query
        }
    }
}
result = elastic.search(index = elastic_url, body = query)

# after processing the query, most of the stuff we need are
# under the hits key
print('available keys:', result['hits'].keys())
print('total number of records', result['hits']['total'])

# hits returns the list of record,
# where the actual content is under
# the _source key
hits = result['hits']['hits'][:2]
hits

available keys: dict_keys(['total', 'max_score', 'hits'])
total number of records 28


[{'_id': 'xRPSS2IB28xBEfc856l3',
  '_index': 'agile_data_science',
  '_score': 7.5014405,
  '_source': {'ActualElapsedTime': 251.0,
   'AirTime': 234.0,
   'ArrDelay': 2.0,
   'ArrDelayMinutes': 2.0,
   'ArrTime': '0126',
   'CRSArrTime': '0124',
   'CRSDepTime': '2245',
   'Cancelled': 0,
   'Carrier': 'UA',
   'DayOfWeek': '5',
   'DayofMonth': '31',
   'DepDelay': 30.0,
   'DepDelayMinutes': 30,
   'DepTime': '2315',
   'Dest': 'SFO',
   'DestCityName': 'San Francisco, CA',
   'DestState': 'CA',
   'Distance': 1846.0,
   'Diverted': 0,
   'FlightDate': '2015-07-31',
   'FlightNum': '1778',
   'Flights': 1,
   'Month': '7',
   'Origin': 'ORD',
   'OriginCityName': 'Chicago, IL',
   'OriginState': 'IL',
   'Quarter': '3',
   'TailNum': 'N37462',
   'TaxiIn': 7.0,
   'TaxiOut': 10.0,
   'WheelsOff': '2325',
   'WheelsOn': '0119',
   'Year': '2015'},
  '_type': 'on_time_performance'},
 {'_id': 'rBPSS2IB28xBEfc856-k',
  '_index': 'agile_data_science',
  '_score': 7.5014405,
  '_source': 

```bash
# or we can use curl to query the data
curl 'localhost:9200/agile_data_science/on_time_performance/_search?q=Origin:ATL&pretty'

```