diff --git a/examples/nyctaxi/README.md b/examples/nyctaxi/README.md new file mode 100644 index 0000000000..3aa6868e21 --- /dev/null +++ b/examples/nyctaxi/README.md @@ -0,0 +1,87 @@ +# Apache Apex Example (NYC Taxi Data) + +## Overview + +This is an example that demonstrates how Apex can be used for processing ride service data, using the freely available +historical Yellow Cab trip data on New York City government's web site. + +It uses concepts of event-time windowing, out-of-order processing and streaming windows. + +## Instructions + +### Data preparation +Download some Yellow Cab trip data CSV files from the nyc.gov website. + +Let's say the data is saved as yellow_tripdata_2016-01.csv. + +Because the trip data source is wildly unordered, sort the data with some random deviation. +```bash +bash> sort -t, -k2 yellow_tripdata_2016-01.csv > yellow_tripdata_sorted_2016-01.csv +``` + +Then add some random deviation to the sorted data: + +```bash +bash> cat nyctaxidata/yellow_tripdata_sorted_2016-01.csv | perl -e '@lines = (); while (<>) { if (@lines && rand(10) < 1) { print shift @lines; } if (rand(50) < 1) { push @lines, $_; } else { print $_; } }' > yellow_tripdata_sorted_random_2016-01.csv +``` + +Then create an HDFS directory and copy the csv file there: + +```bash +bash> hdfs dfs -mkdir nyctaxidata +bash> hdfs dfs -copyFromLocal yellow_tripdata_sorted_random_2016-01.csv nyctaxidata/ +``` + +### Setting up pubsub server + +bash> git clone https://github.com/atrato/pubsub-server + +Then build and run the pubsub server (the message broker): + +bash> cd pubsub-server; mvn compile exec:java + +The pubsub server is now running, listening to the default port 8890 on localhost. + +### Running the application + +Open the Apex CLI command prompt and run the application: + +```bash +bash> apex +apex> launch target/malhar-examples-nyc-taxi-3.8.0-SNAPSHOT.apa +``` + +After the application has been running for 5 minutes, we can start querying the data. The reason why we need to wait +5 minutes is because we need to wait for the first window to pass the watermark for the triggers to be fired by the +WindowedOperator. Subsequent triggers will be fired every one minute since the slideBy is one minute. + +We can use the Simple WebSocket Client Google Chrome extension to query the data. Open the extension in Chrome and +connect to "ws://localhost:8890/pubsub". Subscribe to the query result topic first because results to any query will be +delivered to this topic by sending this to the websocket connection: + +```json +{"type":"subscribe","topic":"nyctaxi.result"} +``` + +Issue a query with latitude/longitude somewhere in Manhattan: + +```json +{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} +``` + +You should get back something like the following: + +```json +{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769034523} +``` + +The result to the same query changes as time goes by since we have "real-time" ride data coming in: +```json +{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} +{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10003"},"timestamp":1500769158530} +{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} +{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769827538} +{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} +{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10012"},"timestamp":1500770540527} +``` + diff --git a/examples/nyctaxi/pom.xml b/examples/nyctaxi/pom.xml new file mode 100644 index 0000000000..990718aa4e --- /dev/null +++ b/examples/nyctaxi/pom.xml @@ -0,0 +1,50 @@ + + + + 4.0.0 + + malhar-examples-nyc-taxi + jar + + NYC Taxi Data Example for Apache Apex + Apex example applications that processes NYC Taxi Data. + + + org.apache.apex + malhar-examples + 3.8.0-SNAPSHOT + + + + + org.apache.commons + commons-lang3 + 3.1 + + + joda-time + joda-time + 2.9.1 + + + + diff --git a/examples/nyctaxi/src/assemble/appPackage.xml b/examples/nyctaxi/src/assemble/appPackage.xml new file mode 100644 index 0000000000..4138cf201e --- /dev/null +++ b/examples/nyctaxi/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ + + + appPackage + + jar + + false + + + ${basedir}/target/ + /app + + ${project.artifactId}-${project.version}.jar + + + + ${basedir}/target/deps + /lib + + + ${basedir}/src/site/conf + /conf + + *.xml + + + + ${basedir}/src/main/resources/META-INF + /META-INF + + + ${basedir}/src/main/resources/app + /app + + + + + diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java new file mode 100644 index 0000000000..926e9eefee --- /dev/null +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.nyctaxi; + + +import java.net.URI; +import java.net.URISyntaxException; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.accumulation.SumDouble; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; +import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; +import org.apache.commons.lang3.mutable.MutableDouble; +import org.apache.hadoop.conf.Configuration; +import com.google.common.base.Throwables; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; +import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; + +/** + * Created by david on 7/2/17. + */ +@ApplicationAnnotation(name = "NycTaxiExample") +public class Application implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000); + NycTaxiDataReader inputOperator = new NycTaxiDataReader(); + inputOperator.setDirectory("/user/" + System.getProperty("user.name") + "/nyctaxidata"); + inputOperator.getScanner().setFilePatternRegexp(".*\\.csv$"); + dag.addOperator("NycTaxiDataReader", inputOperator); + NycTaxiCsvParser parser = dag.addOperator("NycTaxiCsvParser", new NycTaxiCsvParser()); + NycTaxiZipFareExtractor extractor = dag.addOperator("NycTaxiZipFareExtractor", new NycTaxiZipFareExtractor()); + + KeyedWindowedOperatorImpl windowedOperator = new KeyedWindowedOperatorImpl<>(); + + // 5-minute windows slide by 1 minute + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(5)).slideBy(Duration.standardMinutes(1))); + + // Because we only care about the last 5 minutes, and the watermark is set at t-1 minutes, lateness horizon is set to 4 minutes. + windowedOperator.setAllowedLateness(Duration.standardMinutes(4)); + windowedOperator.setAccumulation(new SumDouble()); + windowedOperator.setTriggerOption(TriggerOption.AtWatermark()); + windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage()); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage()); + + dag.addOperator("WindowedOperator", windowedOperator); + + NycTaxiDataServer dataServer = dag.addOperator("NycTaxiDataServer", new NycTaxiDataServer()); + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + dag.addStream("input_to_parser", inputOperator.output, parser.input); + dag.addStream("parser_to_extractor", parser.output, extractor.input); + dag.addStream("extractor_to_windowed", extractor.output, windowedOperator.input); + dag.addStream("extractor_watermark", extractor.watermarkOutput, windowedOperator.controlInput); + dag.addStream("windowed_to_console", windowedOperator.output, dataServer.input, console.input); + + PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery(); + wsQuery.enableEmbeddedMode(); + wsQuery.setTopic("nyctaxi.query"); + try { + wsQuery.setUri(new URI("ws://localhost:8890/pubsub")); + } catch (URISyntaxException ex) { + throw Throwables.propagate(ex); + } + dataServer.setEmbeddableQueryInfoProvider(wsQuery); + PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); + wsResult.setTopic("nyctaxi.result"); + try { + wsResult.setUri(new URI("ws://localhost:8890/pubsub")); + } catch (URISyntaxException ex) { + throw Throwables.propagate(ex); + } + dag.addStream("server_to_query_output", dataServer.queryResult, wsResult.input); + } +} diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java new file mode 100644 index 0000000000..08f59b48ce --- /dev/null +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.nyctaxi; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Throwables; + +/** + * Provides utilities for zip codes and lat-lon coordinates in New York City. + */ +public class NycLocationUtils +{ + public static class ZipRecord + { + public final String zip; + public final double lat; + public final double lon; + public String[] neighboringZips; + + public ZipRecord(String zip, double lat, double lon) + { + this.zip = zip; + this.lat = lat; + this.lon = lon; + } + } + + private static Map zipRecords = new HashMap<>(); + + static { + // setup of NYC zip data. + try (BufferedReader br = new BufferedReader( + new InputStreamReader(NycLocationUtils.class.getResourceAsStream("/nyc_zip_codes.csv")))) { + String line; + while ((line = br.readLine()) != null) { + String[] s = line.split(","); + String zip = s[0].trim(); + double lat = Double.valueOf(s[1].trim()); + double lon = Double.valueOf(s[2].trim()); + zipRecords.put(zip, new ZipRecord(zip, lat, lon)); + } + } catch (IOException ex) { + throw Throwables.propagate(ex); + } + for (Map.Entry entry : zipRecords.entrySet()) { + final ZipRecord entryValue = entry.getValue(); + List zips = new ArrayList<>(zipRecords.keySet()); + + Collections.sort(zips, new Comparator() + { + @Override + public int compare(String s1, String s2) + { + ZipRecord z1 = zipRecords.get(s1); + ZipRecord z2 = zipRecords.get(s2); + double dist1 = Math.pow(z1.lat - entryValue.lat, 2) + Math.pow(z1.lon - entryValue.lon, 2); + double dist2 = Math.pow(z2.lat - entryValue.lat, 2) + Math.pow(z2.lon - entryValue.lon, 2); + return Double.compare(dist1, dist2); + } + }); + entryValue.neighboringZips = zips.subList(0, 8).toArray(new String[]{}); + } + } + + public static String getZip(double lat, double lon) + { + // Brute force to get the nearest zip centoid. Should be able to optimize this. + double minDist = Double.MAX_VALUE; + String zip = null; + for (Map.Entry entry : zipRecords.entrySet()) { + ZipRecord zipRecord = entry.getValue(); + double dist = Math.pow(zipRecord.lat - lat, 2) + Math.pow(zipRecord.lon - lon, 2); + if (dist < minDist) { + zip = entry.getKey(); + minDist = dist; + } + } + return zip; + } + + public static String[] getNeighboringZips(String zip) + { + ZipRecord zipRecord = zipRecords.get(zip); + if (zipRecord != null) { + return zipRecord.neighboringZips; + } else { + return null; + } + } +} diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java new file mode 100644 index 0000000000..f65e816058 --- /dev/null +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.nyctaxi; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * Operator that parses historical New York City Yellow Cab ride data + * from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml. + */ +public class NycTaxiCsvParser extends BaseOperator +{ + public final transient DefaultInputPort input = new DefaultInputPort() + { + @Override + public void process(String tuple) + { + String[] values = tuple.split(","); + Map outputTuple = new HashMap<>(); + if (StringUtils.isNumeric(values[0])) { + outputTuple.put("pickup_time", values[1]); + outputTuple.put("pickup_lon", values[5]); + outputTuple.put("pickup_lat", values[6]); + outputTuple.put("total_fare", values[18]); + output.emit(outputTuple); + } + } + }; + + public final transient DefaultOutputPort> output = new DefaultOutputPort<>(); +} diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java new file mode 100644 index 0000000000..01313cad8f --- /dev/null +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.nyctaxi; + +import java.io.IOException; + +import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; + +/** + * Operator that reads historical New York City Yellow Cab ride data + * from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml. + * + * Note that unlike the raw LineByLineFileInputOperator, we advance the streaming + * window whenever we see a difference in the timestamp in the data. + */ +public class NycTaxiDataReader extends LineByLineFileInputOperator +{ + private String currentTimestamp; + private transient boolean suspendEmit = false; + + public NycTaxiDataReader() + { + // Whether or not to advance the window does not depend on the size. It solely + // depends on the timestamp of the data. This is why we are setting this to Integer.MAX_VALUE. + // See below for "suspendEmit". + emitBatchSize = Integer.MAX_VALUE; + } + + @Override + protected boolean suspendEmit() + { + return suspendEmit; + } + + @Override + protected String readEntity() throws IOException + { + String line = super.readEntity(); + String[] fields = line.split(","); + String timestamp = fields[1]; + if (currentTimestamp == null) { + currentTimestamp = timestamp; + } else if (timestamp != currentTimestamp) { + // suspend emit until the next streaming window when timestamp is different from the current timestamp. + suspendEmit = true; + currentTimestamp = timestamp; + } + return line; + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + // Resume emit since we now have a new streaming window. + suspendEmit = false; + } +} diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataServer.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataServer.java new file mode 100644 index 0000000000..fd94e200da --- /dev/null +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataServer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.nyctaxi; + +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.Map; + + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.appdata.AbstractAppDataServer; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.experimental.AppData; +import com.datatorrent.lib.util.KeyValPair; + + +/** + * Operator that reads the KeyValPair tuples from the Windowed Operator and serves live queries. + * + * The KeyValPair input tuples are zip to total payment of the window. They are collected by an internal map so that + * the data can be served. + */ +public class NycTaxiDataServer extends AbstractAppDataServer +{ + public final transient DefaultInputPort>> input = new DefaultInputPort>>() + { + @Override + public void process(Tuple.WindowedTuple> tuple) + { + if (!currentWindowHasData) { + currentData = new HashMap<>(); + currentWindowHasData = true; + } + KeyValPair tupleValue = tuple.getValue(); + currentData.put(tupleValue.getKey(), tupleValue.getValue()); + } + }; + + @AppData.ResultPort + public final transient DefaultOutputPort queryResult = new DefaultOutputPort<>(); + + private Map servingData = new HashMap<>(); + private transient Map currentData = new HashMap<>(); + private transient ArrayDeque resultQueue = new ArrayDeque<>(); + private boolean currentWindowHasData = false; + + @Override + public void beginWindow(long l) + { + super.beginWindow(l); + currentWindowHasData = false; + } + + @Override + public void endWindow() + { + while (!resultQueue.isEmpty()) { + String result = resultQueue.remove(); + queryResult.emit(result); + } + servingData = currentData; + super.endWindow(); + } + + + @Override + protected void processQuery(String queryStr) + { + try { + JSONObject query = new JSONObject(queryStr); + JSONObject result = new JSONObject(); + double lat = query.getDouble("lat"); + double lon = query.getDouble("lon"); + Pair zips = recommendZip(lat, lon); + result.put("currentZip", zips.getLeft()); + result.put("driveToZip", zips.getRight()); + resultQueue.add(result.toString()); + } catch (JSONException e) { + LOG.error("Unrecognized query: {}", queryStr); + } + } + + public Pair recommendZip(double lat, double lon) + { + String currentZip = NycLocationUtils.getZip(lat, lon); + String zip = currentZip; + String[] neighboringZips = NycLocationUtils.getNeighboringZips(zip); + double dollars = servingData.containsKey(zip) ? servingData.get(zip) : 0; + LOG.info("Current zip: {}={}", zip, dollars); + for (String neigboringZip : neighboringZips) { + double tmpDollars = servingData.containsKey(neigboringZip) ? servingData.get(neigboringZip) : 0; + LOG.info("Neighboring zip: {}={}", neigboringZip, tmpDollars); + if (tmpDollars > dollars) { + dollars = tmpDollars; + zip = neigboringZip; + } + } + return new ImmutablePair<>(currentZip, zip); + } + + private static final Logger LOG = LoggerFactory.getLogger(NycTaxiDataServer.class); + +} diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiZipFareExtractor.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiZipFareExtractor.java new file mode 100644 index 0000000000..57b7d458e2 --- /dev/null +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiZipFareExtractor.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.nyctaxi; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; +import java.util.TimeZone; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.Tuple; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Operator that fills in the zip code based on the lat-lon coordinates in the incoming tuples and prepares + * the tuples for the WindowedOperator downstream. It also generates a watermark that is t - 1 minute. + */ +public class NycTaxiZipFareExtractor extends BaseOperator +{ + public static class Watermark implements ControlTuple.Watermark + { + private long timestamp; + + private Watermark() + { + // for kryo + } + + public Watermark(long timestamp) + { + this.timestamp = timestamp; + } + + @Override + public long getTimestamp() + { + return this.timestamp; + } + } + + public final transient DefaultInputPort> input = new DefaultInputPort>() + { + @Override + public void process(Map tuple) + { + try { + String zip = NycLocationUtils.getZip(Double.valueOf(tuple.get("pickup_lat")), Double.valueOf(tuple.get("pickup_lon"))); + Date date = dateFormat.parse(tuple.get("pickup_time")); + long timestamp = date.getTime(); + double fare = Double.valueOf(tuple.get("total_fare")); + output.emit(new Tuple.TimestampedTuple<>(timestamp, new KeyValPair<>(zip, fare))); + if (timestamp > currentTimestamp) { + currentTimestamp = timestamp; + watermarkOutput.emit(new Watermark(timestamp - 60 * 1000)); + } + } catch (ParseException ex) { + LOG.warn("Ignoring tuple with bad timestamp {}", tuple.get("pickup_time")); + } + } + }; + + public final transient DefaultOutputPort>> output = new DefaultOutputPort<>(); + public final transient DefaultOutputPort watermarkOutput = new DefaultOutputPort<>(); + + private transient SimpleDateFormat dateFormat; + private long currentTimestamp = -1; + + private static final Logger LOG = LoggerFactory.getLogger(NycTaxiZipFareExtractor.class); + + @Override + public void setup(OperatorContext context) + { + dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); + dateFormat.setTimeZone(TimeZone.getTimeZone("America/New_York")); + } + +} diff --git a/examples/nyctaxi/src/main/resources/nyc_zip_codes.csv b/examples/nyctaxi/src/main/resources/nyc_zip_codes.csv new file mode 100644 index 0000000000..4eb72b006d --- /dev/null +++ b/examples/nyctaxi/src/main/resources/nyc_zip_codes.csv @@ -0,0 +1,223 @@ +10001,40.750633, -73.997177 +10002,40.715775, -73.986212 +10003,40.731829, -73.989181 +10004,40.688630, -74.018244 +10005,40.706027, -74.008835 +10006,40.709614, -74.012954 +10007,40.713848, -74.007755 +10009,40.726399, -73.978631 +10010,40.739065, -73.982255 +10011,40.742039, -74.000620 +10012,40.725581, -73.998078 +10013,40.720103, -74.004903 +10014,40.734012, -74.006746 +10016,40.745224, -73.978297 +10017,40.752360, -73.972493 +10018,40.755319, -73.993114 +10019,40.765823, -73.987169 +10020,40.758236, -73.978833 +10021,40.769258, -73.958751 +10022,40.758628, -73.967948 +10023,40.775921, -73.982607 +10024,40.798452, -73.974428 +10025,40.798601, -73.966622 +10026,40.802381, -73.952681 +10027,40.811407, -73.953060 +10028,40.776441, -73.953509 +10029,40.791763, -73.943970 +10030,40.818267, -73.942856 +10031,40.825288, -73.950045 +10032,40.838815, -73.942836 +10033,40.850545, -73.933983 +10034,40.867076, -73.924312 +10035,40.795455, -73.929655 +10036,40.759260, -73.989860 +10037,40.812957, -73.937376 +10038,40.709278, -74.002562 +10039,40.830867, -73.936218 +10040,40.858305, -73.930549 +10044,40.761915, -73.949962 +10065,40.764612, -73.963122 +10069,40.775906, -73.990358 +10075,40.773361, -73.956216 +10103,40.760780, -73.977670 +10110,40.754499, -73.982256 +10111,40.759114, -73.977596 +10112,40.759167, -73.979668 +10115,40.810852, -73.963744 +10119,40.750310, -73.992979 +10128,40.781432, -73.950013 +10152,40.758404, -73.972031 +10153,40.763622, -73.972439 +10154,40.757779, -73.972487 +10162,40.769300, -73.949915 +10165,40.752131, -73.978722 +10167,40.754648, -73.974771 +10168,40.751448, -73.977103 +10169,40.754391, -73.976098 +10170,40.752625, -73.975877 +10171,40.755899, -73.973858 +10172,40.755273, -73.974315 +10173,40.754131, -73.979364 +10174,40.751441, -73.975003 +10177,40.755139, -73.975934 +10199,40.751393, -73.997229 +10271,40.708205, -74.010504 +10278,40.715182, -74.003778 +10279,40.712626, -74.008669 +10280,40.708538, -74.016650 +10282,40.716921, -74.015066 +10301,40.627456, -74.094407 +10302,40.630688, -74.137776 +10303,40.629885, -74.174130 +10304,40.609227, -74.092575 +10305,40.596691, -74.074866 +10306,40.571768, -74.125950 +10307,40.509183, -74.237785 +10308,40.551884, -74.147646 +10309,40.531346, -74.219857 +10310,40.632648, -74.116148 +10311,40.605241, -74.179503 +10312,40.545237, -74.180443 +10314,40.599263, -74.165748 +10451,40.820479, -73.925084 +10452,40.837393, -73.923437 +10453,40.852779, -73.912332 +10454,40.805489, -73.916585 +10455,40.814710, -73.908593 +10456,40.829881, -73.908120 +10457,40.847150, -73.898680 +10458,40.862543, -73.888143 +10459,40.825867, -73.892942 +10460,40.841758, -73.879571 +10461,40.847381, -73.840584 +10462,40.843280, -73.860389 +10463,40.880678, -73.906540 +10464,40.867787, -73.799920 +10465,40.822615, -73.822239 +10466,40.890964, -73.846239 +10467,40.869953, -73.865746 +10468,40.868093, -73.899730 +10469,40.868607, -73.848133 +10470,40.889530, -73.872662 +10471,40.898868, -73.903328 +10472,40.829556, -73.869336 +10473,40.818690, -73.858474 +10474,40.810549, -73.884367 +10475,40.875169, -73.823817 +11001,40.723317, -73.704949 +11003,40.699176, -73.706166 +11004,40.746204, -73.711478 +11005,40.756596, -73.714178 +11010,40.700587, -73.675018 +11020,40.771442, -73.714819 +11021,40.784319, -73.731488 +11023,40.798909, -73.733653 +11024,40.816251, -73.742872 +11030,40.793409, -73.688549 +11040,40.745347, -73.680292 +11042,40.758534, -73.697235 +11050,40.839900, -73.693124 +11096,40.621346, -73.756990 +11101,40.747155, -73.939750 +11102,40.772884, -73.926295 +11103,40.762574, -73.913447 +11104,40.744634, -73.920201 +11105,40.778877, -73.906769 +11106,40.762211, -73.931528 +11109,40.745115, -73.956928 +11201,40.693682, -73.989693 +11203,40.649591, -73.934371 +11204,40.618779, -73.984826 +11205,40.694696, -73.966286 +11206,40.701954, -73.942358 +11207,40.670747, -73.894209 +11208,40.669769, -73.871372 +11209,40.621982, -74.030324 +11210,40.628147, -73.946324 +11211,40.712597, -73.953098 +11212,40.662936, -73.913029 +11213,40.671078, -73.936336 +11214,40.599148, -73.996090 +11215,40.662688, -73.986740 +11216,40.680768, -73.949316 +11217,40.682306, -73.978099 +11218,40.643468, -73.976046 +11219,40.632667, -73.996669 +11220,40.641221, -74.016862 +11221,40.691340, -73.927879 +11222,40.727790, -73.947605 +11223,40.597139, -73.973428 +11224,40.577372, -73.988706 +11225,40.663046, -73.954219 +11226,40.646448, -73.956649 +11228,40.616695, -74.013047 +11229,40.601293, -73.944493 +11230,40.622164, -73.965105 +11231,40.677916, -74.005154 +11232,40.656546, -74.007355 +11233,40.678308, -73.919936 +11234,40.605080, -73.911721 +11235,40.583949, -73.949096 +11236,40.639413, -73.900664 +11237,40.704160, -73.921139 +11238,40.679171, -73.963804 +11239,40.647735, -73.879477 +11351,40.780747, -73.825301 +11354,40.768208, -73.827403 +11355,40.751452, -73.821031 +11356,40.784850, -73.841279 +11357,40.786393, -73.810864 +11358,40.760471, -73.796371 +11359,40.791781, -73.776875 +11360,40.780379, -73.781230 +11361,40.764191, -73.772775 +11362,40.756574, -73.737845 +11363,40.772616, -73.746526 +11364,40.745289, -73.760586 +11365,40.739634, -73.794490 +11366,40.728152, -73.785019 +11367,40.730145, -73.827030 +11368,40.751718, -73.851822 +11369,40.763365, -73.872374 +11370,40.765393, -73.893243 +11371,40.773894, -73.873475 +11372,40.751690, -73.883638 +11373,40.738837, -73.878535 +11374,40.726418, -73.861526 +11375,40.720934, -73.846151 +11377,40.744819, -73.905156 +11378,40.724744, -73.909639 +11379,40.716748, -73.879601 +11385,40.700671, -73.889433 +11411,40.694021, -73.736216 +11412,40.698095, -73.758986 +11413,40.671659, -73.752568 +11414,40.657604, -73.844804 +11415,40.707917, -73.828212 +11416,40.684654, -73.849548 +11417,40.676446, -73.844443 +11418,40.700272, -73.835971 +11419,40.688673, -73.822918 +11420,40.673583, -73.817730 +11421,40.694062, -73.858626 +11422,40.660060, -73.736012 +11423,40.715606, -73.768471 +11424,40.714304, -73.827263 +11425,40.607754, -74.023937 +11426,40.736425, -73.722376 +11427,40.730904, -73.745661 +11428,40.721016, -73.742245 +11429,40.709766, -73.738653 +11430,40.646964, -73.784813 +11432,40.715359, -73.793071 +11433,40.698162, -73.786893 +11434,40.676808, -73.776425 +11435,40.701265, -73.809605 +11436,40.675807, -73.796622 +11451,40.701282, -73.795972 +11691,40.601278, -73.761651 +11692,40.594095, -73.792896 +11693,40.590692, -73.809749 +11694,40.578270, -73.844762 +11697,40.555688, -73.920663 diff --git a/examples/pom.xml b/examples/pom.xml index cfd8431f61..a0b126b45f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -202,6 +202,7 @@ s3 jdbc exactly-once + nyctaxi diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index eae9e1231b..e06d411198 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -207,7 +207,6 @@ public Thread newThread(Runnable r) @Override public void onMessage(String string) { - LOG.debug("Got: " + string); try { T o = convertMessage(string); if (!(skipNull && o == null)) {