Skip to content

Commit

Permalink
Adding airline demo to public repo
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie Barbetta committed Feb 25, 2016
1 parent b69382e commit 0dd5bcc
Show file tree
Hide file tree
Showing 15 changed files with 9,436 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -6,3 +6,4 @@ java/dependency-reduced-pom.xml
.__browserify_string_empty.js
.DS_Store
jupyter-js-services
examples/airline/public/data/2008bd.json/*
27 changes: 27 additions & 0 deletions examples/airline/README
@@ -0,0 +1,27 @@
Prereqs:
- If running local spark cluster:
- Check your system maxfile setting
- `launchctl limit maxfiles`
- If you need to bump it up (should be soft/hard limits of at least 65536)
- See http://docs.basho.com/riak/latest/ops/tuning/open-files-limit/#Mac-OS-X
- By default a local standalone Spark instance is used (e.g. "local[*]")
- To run a local Spark cluster (e.g. master and 1 slave)
- http://spark.apache.org/docs/latest/spark-standalone.html
- On startup note the URL for Spark master (or you can go to localhost:8080 to get it)
- Should be something like spark://<master_host>:7077
- `export SPARK_MASTER=<master_url>`
- Start your jupyter gateway (e.g. `jupyter notebook --no-browser`)

Setup:
- `cd <git_projects_root>/eclairjs-node/examples/airline`
- Run `npm install`
- If running local cluster get the airline data in JSON format into data dir
- `cd eclair-node/examples/airline/public/data`
- ungzip/untar the datafile for the flight schedules 2008bd.json.tgz
- `gunzip 2008bd.json.tgz`
- `tar -xvf 2008bd.json.tar`
- By default Kafak pump on softlayer is bring used. See airline.js about line 14 to change if desired.

Running the demo:
- From <git_projects_root>/eclairjs-node/examples/airline run `node --harmony index.js`
- Point browser to localhost:3000
109 changes: 109 additions & 0 deletions examples/airline/airline.js
@@ -0,0 +1,109 @@
var spark = require('eclairjs');

var sparkMaster = process.env.SPARK_MASTER || "local[*]";
console.log("spark master = " + sparkMaster);
var sc = new spark.SparkContext(sparkMaster, "Airline Demo");
var sqlContext = new spark.SQLContext(sc);
var ssc;

// rdu,aa,234,sfo,3
function startStream() {
ssc = new spark.StreamingContext(sc, new spark.streaming.Duration(2000));
var kafkaHost = process.env.KAFKA_HOST || "169.54.140.107:2181"
var dstream = spark.streaming.KafkaUtils
.createStream(ssc, kafkaHost, "floyd", "airline")
//.createStream(ssc, "10.11.19.101:2181", "floyd", "airline")
.window(new spark.streaming.Duration(1000 * 60 * 15))
.flatMap(function(chunk) {
return chunk[1].split('\n');
})
.map(function(line) {
var lineArr = line.split(",");
var str = JSON.stringify({
"origin": lineArr[16],
"carrier": lineArr[8],
"flight_num": lineArr[9],
"destination": lineArr[17],
"take_off_delay_mins": parseInt(lineArr[15])
})

return str;
});

dstream.foreachRDD(function(rdd) {
// RDD.isEmpty() doesn't exist anymore - Was this a mistake?? It still exists on Nashorn side.
//if(!rdd.isEmpty()) {
var df = sqlContext.read().json(rdd);
//if (df.count() > 0) {
print("got data from stream: "+ df.count());
df.registerTempTable("airlinedata");
//}
//}
}).then(function() {
ssc.start().catch(function(err) {
console.log("error starting streaming context");
console.log(err);
})
}).catch(function(err) {
console.log("error sending print command");
console.log(err);
})
}

function getTodaysFlights() {
var file = process.env.FLIGHT_DATA || 'file:' + __dirname + '/public/data/2008bd.json';
//console.log('Getting static data from file: ',file);

var dfAllFlights = sqlContext.read().json(file);
dfAllFlights.count().then(function(count){
console.log('Num all US flights: ',count);
});

var today = new Date();
var month = today.getMonth()+1; // 0 indexed e.g. 0-11
var day = today.getDate(); // 1 indexed e.g. 1-31

var dfFlightsForToday =
dfAllFlights.filter("month='"+month+"' AND day='"+day+"'");
dfFlightsForToday.count().then(function(count){
console.log('Num all flights for today '+month+'-'+day+': ',JSON.stringify(count));
dfFlightsForToday.registerTempTable('flightstoday').then(function(){
console.log('Temptable flightstoday registered');
});
});
}

function AirportDemo() {
}


AirportDemo.prototype.start = function() {
startStream();
getTodaysFlights();
}

AirportDemo.prototype.stop = function(callback) {
if (sc) {
console.log('stop - SparkContext exists');
if (ssc) {
console.log('stop - SparkStreamingContext exists');
ssc.stop();
ssc.awaitTerminationOrTimeout(5000).then(function() {
sc.stop().then(callback).catch(callback);
//callback();
}).catch(function(err) {
console.log("error stopping stream");
//console.log(err);
sc.stop().then(callback).catch(callback);
});
} else {
sc.stop().then(callback).catch(callback);
}
}
}

AirportDemo.prototype.query = function(sql) {
return sqlContext.sql(sql);
}

module.exports = new AirportDemo();
114 changes: 114 additions & 0 deletions examples/airline/index.js
@@ -0,0 +1,114 @@
/*
* Copyright 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

var express = require('express');
var app = express();

app.use(express.static('public'));
app.use(express.static(__dirname + '/public'));

var airlineDemo = require('./airline.js');

app.get('/getFlights', function (request, response) {
var airportCode = request.query.airport;
// rdu,aa,234,sfo,3
var rawData = [
{origin: airportCode, carrier: 'aa', flight_num: 234, destination: 'PIR', take_off_delay_mins: 10},
{origin: airportCode, carrier: 'aa', flight_num: 398, destination: 'SIK', take_off_delay_mins: 0},
{origin: airportCode, carrier: 'aa', flight_num: 466, destination: 'HOU', take_off_delay_mins: 25},
{origin: airportCode, carrier: 'aa', flight_num: 922, destination: 'HOU', take_off_delay_mins: 70},
{origin: airportCode, carrier: 'aa', flight_num: 6004, destination: 'DFW', take_off_delay_mins: 65},
{origin: airportCode, carrier: 'dl', flight_num: 499, destination: 'LGA', take_off_delay_mins: 120},
{origin: airportCode, carrier: 'dl', flight_num: 1122, destination: 'SMF', take_off_delay_mins: 0},
{origin: airportCode, carrier: 'dl', flight_num: 4555, destination: 'SFO', take_off_delay_mins: 9},
{origin: airportCode, carrier: 'dl', flight_num: 870, destination: 'SMO', take_off_delay_mins: 25},
{origin: airportCode, carrier: 'dl', flight_num: 6509, destination: 'HPN', take_off_delay_mins: 45},
{origin: airportCode, carrier: 'dl', flight_num: 9694, destination: 'JFK', take_off_delay_mins: 240},
{origin: airportCode, carrier: 'dl', flight_num: 1134, destination: 'LGA', take_off_delay_mins: 120}
];
//response.json(rawData);
//return;

try {
var df = airlineDemo.query("SELECT * FROM airlinedata WHERE origin='"+airportCode+"'");
} catch (e) {
console.log("e", e)
}

/*df.count().then(function(c) {
console.log("count:", c)
}).catch(function(e) {
console.log(e)
})*/

df.toJSON().toArray().then(function(result) {
//console.log(JSON.stringify(result))
response.json(result);
}).catch(function(e) {
console.log(e)
})
});

app.get('/getCarriers', function (request, response) {
var airportCode = request.query.airport;
try {
var carriers = airlineDemo.query("SELECT DISTINCT carrier FROM flightstoday WHERE origin='"+airportCode+"'");
carriers.cache().toJSON().toArray().then(function(result){
console.log('distinct carriers for ',airportCode,': ',JSON.stringify(result));
response.json(result);
});
} catch (e) {
console.log("e", e)
}
});

app.get('/getSchedule', function (request, response) {
var airportCode = request.query.airport;
var carrier = request.query.carrier;
try {
var flightsToday = airlineDemo.query("SELECT flight_num,destination FROM flightstoday WHERE origin='" +
airportCode + "' AND carrier='" + carrier + "'");
flightsToday.cache().toJSON().toArray().then(function(result){
console.log('schedule for carrier and airport ',airportCode,' ',carrier,': ',JSON.stringify(result));
response.json(result);
});
} catch (e) {
console.log("e", e)
}
});

var port = process.env.VCAP_APP_PORT || 3000;
var server = app.listen(port, function () {
console.log('listening on *:'+port);
});

// start the demo
airlineDemo.start();

// stop spark streaming when we stop the node program
process.on('SIGTERM', function () {
airlineDemo.stop(function() {
console.log('SIGTERM - stream has been stopped');
process.exit(0);
});
});

process.on('SIGINT', function () {
airlineDemo.stop(function() {
console.log('SIGINT - stream has been stopped');
process.exit(0);
});
});
8 changes: 8 additions & 0 deletions examples/airline/manifest.yml
@@ -0,0 +1,8 @@
Applications:
- disk_quota: 1024M
host: airlinedemo
name: airlinedemo
path: .
domain: mybluemix.net
instances: 1
memory: 512M
30 changes: 30 additions & 0 deletions examples/airline/package.json
@@ -0,0 +1,30 @@
{
"name": "eclairjs-node-airlinedemo",
"version": "0.0.1",
"description": "IBM javascript analytics",
"author": "IBM Emerging Tech",
"contributors": [
],
"scripts": {
"start": "node --harmony index.js"
},
"main": "index.js",
"dependencies": {
"async": "*",
"express": "~4.12.0",
"kafka": "*",
"kafka-node": "~0.2.25",
"kafkaesque": "*",
"leaflet.heat": "^0.1.3",
"sleep": "*",
"socket.io": "^1.3.6",
"ws": "^0.7.1",
"node-rest-client": "^1.5.1",
"ws": "^0.8.0",
"eclairjs": "git+https://github.com/EclairJS/eclairjs-node.git"
},
"devDependencies": {
},
"preferGlobal": false,
"private": true
}
Binary file added examples/airline/public/data/2008bd.json.tgz
Binary file not shown.

0 comments on commit 0dd5bcc

Please sign in to comment.