Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
nyc taxi example checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
davidyan74 committed Oct 16, 2017
1 parent be57b9f commit 953135b
Show file tree
Hide file tree
Showing 12 changed files with 995 additions and 1 deletion.
87 changes: 87 additions & 0 deletions examples/nyctaxi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Apache Apex Example (NYC Taxi Data)

## Overview

This is an example that demonstrates how Apex can be used for processing ride service data, using the freely available
historical Yellow Cab trip data on New York City government's web site.

It uses concepts of event-time windowing, out-of-order processing and streaming windows.

## Instructions

### Data preparation
Download some Yellow Cab trip data CSV files from the nyc.gov website.

Let's say the data is saved as yellow_tripdata_2016-01.csv.

Because the trip data source is wildly unordered, sort the data with some random deviation.
```bash
bash> sort -t, -k2 yellow_tripdata_2016-01.csv > yellow_tripdata_sorted_2016-01.csv
```

Then add some random deviation to the sorted data:

```bash
bash> cat nyctaxidata/yellow_tripdata_sorted_2016-01.csv | perl -e '@lines = (); while (<>) { if (@lines && rand(10) < 1) { print shift @lines; } if (rand(50) < 1) { push @lines, $_; } else { print $_; } }' > yellow_tripdata_sorted_random_2016-01.csv
```

Then create an HDFS directory and copy the csv file there:

```bash
bash> hdfs dfs -mkdir nyctaxidata
bash> hdfs dfs -copyFromLocal yellow_tripdata_sorted_random_2016-01.csv nyctaxidata/
```

### Setting up pubsub server

bash> git clone https://github.com/atrato/pubsub-server

Then build and run the pubsub server (the message broker):

bash> cd pubsub-server; mvn compile exec:java

The pubsub server is now running, listening to the default port 8890 on localhost.

### Running the application

Open the Apex CLI command prompt and run the application:

```bash
bash> apex
apex> launch target/malhar-examples-nyc-taxi-3.8.0-SNAPSHOT.apa
```

After the application has been running for 5 minutes, we can start querying the data. The reason why we need to wait
5 minutes is because we need to wait for the first window to pass the watermark for the triggers to be fired by the
WindowedOperator. Subsequent triggers will be fired every one minute since the slideBy is one minute.

We can use the Simple WebSocket Client Google Chrome extension to query the data. Open the extension in Chrome and
connect to "ws://localhost:8890/pubsub". Subscribe to the query result topic first because results to any query will be
delivered to this topic by sending this to the websocket connection:

```json
{"type":"subscribe","topic":"nyctaxi.result"}
```

Issue a query with latitude/longitude somewhere in Manhattan:

```json
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
```

You should get back something like the following:

```json
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769034523}
```

The result to the same query changes as time goes by since we have "real-time" ride data coming in:
```json
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10003"},"timestamp":1500769158530}
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769827538}
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10012"},"timestamp":1500770540527}
```

50 changes: 50 additions & 0 deletions examples/nyctaxi/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>malhar-examples-nyc-taxi</artifactId>
<packaging>jar</packaging>

<name>NYC Taxi Data Example for Apache Apex</name>
<description>Apex example applications that processes NYC Taxi Data.</description>

<parent>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-examples</artifactId>
<version>3.8.0-SNAPSHOT</version>
</parent>

<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.1</version>
</dependency>
</dependencies>

</project>
59 changes: 59 additions & 0 deletions examples/nyctaxi/src/assemble/appPackage.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>appPackage</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${basedir}/target/</directory>
<outputDirectory>/app</outputDirectory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/target/deps</directory>
<outputDirectory>/lib</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/site/conf</directory>
<outputDirectory>/conf</outputDirectory>
<includes>
<include>*.xml</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/META-INF</directory>
<outputDirectory>/META-INF</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/app</directory>
<outputDirectory>/app</outputDirectory>
</fileSet>
</fileSets>

</assembly>

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.examples.nyctaxi;


import java.net.URI;
import java.net.URISyntaxException;

import org.joda.time.Duration;

import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
import org.apache.apex.malhar.lib.window.accumulation.SumDouble;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Throwables;

import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;

/**
* Created by david on 7/2/17.
*/
@ApplicationAnnotation(name = "NycTaxiExample")
public class Application implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
NycTaxiDataReader inputOperator = new NycTaxiDataReader();
inputOperator.setDirectory("/user/" + System.getProperty("user.name") + "/nyctaxidata");
inputOperator.getScanner().setFilePatternRegexp(".*\\.csv$");
dag.addOperator("NycTaxiDataReader", inputOperator);
NycTaxiCsvParser parser = dag.addOperator("NycTaxiCsvParser", new NycTaxiCsvParser());
NycTaxiZipFareExtractor extractor = dag.addOperator("NycTaxiZipFareExtractor", new NycTaxiZipFareExtractor());

KeyedWindowedOperatorImpl<String, Double, MutableDouble, Double> windowedOperator = new KeyedWindowedOperatorImpl<>();

// 5-minute windows slide by 1 minute
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(5)).slideBy(Duration.standardMinutes(1)));

// Because we only care about the last 5 minutes, and the watermark is set at t-1 minutes, lateness horizon is set to 4 minutes.
windowedOperator.setAllowedLateness(Duration.standardMinutes(4));
windowedOperator.setAccumulation(new SumDouble());
windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableDouble>());
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());

dag.addOperator("WindowedOperator", windowedOperator);

NycTaxiDataServer dataServer = dag.addOperator("NycTaxiDataServer", new NycTaxiDataServer());
ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("input_to_parser", inputOperator.output, parser.input);
dag.addStream("parser_to_extractor", parser.output, extractor.input);
dag.addStream("extractor_to_windowed", extractor.output, windowedOperator.input);
dag.addStream("extractor_watermark", extractor.watermarkOutput, windowedOperator.controlInput);
dag.addStream("windowed_to_console", windowedOperator.output, dataServer.input, console.input);

PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
wsQuery.enableEmbeddedMode();
wsQuery.setTopic("nyctaxi.query");
try {
wsQuery.setUri(new URI("ws://localhost:8890/pubsub"));
} catch (URISyntaxException ex) {
throw Throwables.propagate(ex);
}
dataServer.setEmbeddableQueryInfoProvider(wsQuery);
PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
wsResult.setTopic("nyctaxi.result");
try {
wsResult.setUri(new URI("ws://localhost:8890/pubsub"));
} catch (URISyntaxException ex) {
throw Throwables.propagate(ex);
}
dag.addStream("server_to_query_output", dataServer.queryResult, wsResult.input);
}
}
Loading

0 comments on commit 953135b

Please sign in to comment.