In [26]:
import numpy as np
import pandas as pd
import time
from kafka import KafkaProducer


In [27]:
# Explore the original data for Taxi in 2019-2
path = '../data/yellow_tripdata_2019-02.csv'
taxi_data = pd.read_csv(path, dtype=str)
taxi_data.columns


Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge'],
      dtype='object')

In [28]:
# Discard some unrelated columns to simplify the problem
# Some other attributes may be used later
simplified_taxi_data = taxi_data.drop(['VendorID',  'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'payment_type', 'fare_amount', 'extra', 'total_amount',
                                       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge'], axis=1)

simplified_taxi_data


Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,PULocationID,DOLocationID
0,2019-02-01 00:59:04,2019-02-01 01:07:27,48,234
1,2019-02-01 00:33:09,2019-02-01 01:03:58,230,93
2,2019-02-01 00:09:03,2019-02-01 00:09:16,145,145
3,2019-02-01 00:45:38,2019-02-01 00:51:10,95,95
4,2019-02-01 00:25:30,2019-02-01 00:28:14,140,263
...,...,...,...,...
3758761,2019-02-15 15:47:59,2019-02-15 16:05:38,229,100
3758762,2019-02-15 15:20:41,2019-02-15 15:26:20,246,68
3758763,2019-02-15 15:29:32,2019-02-15 15:35:07,186,170
3758764,2019-02-15 15:38:52,2019-02-15 15:50:05,137,90


In [54]:
# Filter out the data within the manhattan zones
simplified_taxi_data['DOLocationID'] = simplified_taxi_data['DOLocationID'].astype('int64')
simplified_taxi_data['PULocationID'] = simplified_taxi_data['PULocationID'].astype('int64')

manhattan_zones = pd.read_csv("../data-NYCZones/zones/manhattan_zones.csv")
manhattan_zones_id  = list(manhattan_zones["zone_id"])
manhattan_taxi_data = simplified_taxi_data[(simplified_taxi_data['DOLocationID'].isin(manhattan_zones_id)) & (simplified_taxi_data['PULocationID'].isin(manhattan_zones_id))]

# Discard data out of the time range
year = 2019
month = 2
manhattan_taxi_data = manhattan_taxi_data[manhattan_taxi_data["tpep_pickup_datetime"] > f"{year}-%02d-01 00:00:00" % month]
manhattan_taxi_data = manhattan_taxi_data[manhattan_taxi_data["tpep_dropoff_datetime"] > f"{year}-%02d-01 00:00:00" % month]
manhattan_taxi_data = manhattan_taxi_data[manhattan_taxi_data["tpep_pickup_datetime"] < f"{year}-%02d-31 23:59:59" % month]
manhattan_taxi_data = manhattan_taxi_data[manhattan_taxi_data["tpep_dropoff_datetime"] < f"{year}-%02d-31 23:59:59" % month]

# Align time into hour, maybe handled with flink later
manhattan_taxi_data['tpep_pickup_datetime'] = pd.to_datetime(manhattan_taxi_data['tpep_pickup_datetime'])
manhattan_taxi_data['tpep_dropoff_datetime'] = pd.to_datetime(manhattan_taxi_data['tpep_dropoff_datetime'])

In [55]:
# Sort by drop off time to simulate the real-time events
result = manhattan_taxi_data
result = result.sort_values(by=['tpep_dropoff_datetime'])

# Filter to get the data in 2019-02-01
result = result[result["tpep_dropoff_datetime"] < f"2019-02-01 23:59:59"]
result

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,PULocationID,DOLocationID
6033,2019-02-01 00:00:43,2019-02-01 00:00:50,238,239
4846,2019-02-01 00:01:00,2019-02-01 00:01:51,79,79
6747,2019-02-01 00:00:37,2019-02-01 00:02:20,140,262
77,2019-02-01 00:00:05,2019-02-01 00:02:32,164,164
1101,2019-02-01 00:01:44,2019-02-01 00:02:42,143,143
...,...,...,...,...
296356,2019-02-01 23:41:40,2019-02-01 23:59:57,170,41
293510,2019-02-01 23:47:37,2019-02-01 23:59:57,246,170
308823,2019-02-01 23:40:44,2019-02-01 23:59:58,186,79
296454,2019-02-01 23:50:46,2019-02-01 23:59:58,237,142


In [57]:
# Preload the data for 1 hour
pre_loaded_data = result[result["tpep_dropoff_datetime"] < f"2019-02-01 01:00:06"]
pre_loaded_data

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,PULocationID,DOLocationID
6033,2019-02-01 00:00:43,2019-02-01 00:00:50,238,239
4846,2019-02-01 00:01:00,2019-02-01 00:01:51,79,79
6747,2019-02-01 00:00:37,2019-02-01 00:02:20,140,262
77,2019-02-01 00:00:05,2019-02-01 00:02:32,164,164
1101,2019-02-01 00:01:44,2019-02-01 00:02:42,143,143
...,...,...,...,...
4973,2019-02-01 00:55:10,2019-02-01 01:00:01,79,79
1611,2019-02-01 00:52:03,2019-02-01 01:00:01,231,144
1377,2019-02-01 00:53:38,2019-02-01 01:00:03,79,234
1287,2019-02-01 00:52:41,2019-02-01 01:00:03,239,75


In [33]:
# The data in time range [1:00, 3:00] are loaded for testing
# The time scale is 1min -> 1sec, so lasting for 2 minutes
remaining_data = result[(result["tpep_dropoff_datetime"] > f"2019-02-01 01:00:00")
                        & (result["tpep_dropoff_datetime"] < f"2019-02-01 03:00:00")]
remaining_data


Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,PULocationID,DOLocationID,alignedtime
4973,2019-02-01 00:55:10,2019-02-01 01:00:01,79,79,2019-02-01 00:00:00
1611,2019-02-01 00:52:03,2019-02-01 01:00:01,231,144,2019-02-01 00:00:00
1377,2019-02-01 00:53:38,2019-02-01 01:00:03,79,234,2019-02-01 00:00:00
1287,2019-02-01 00:52:41,2019-02-01 01:00:03,239,75,2019-02-01 00:00:00
1991,2019-02-01 00:47:30,2019-02-01 01:00:05,79,246,2019-02-01 00:00:00
...,...,...,...,...,...
13427,2019-02-01 02:34:28,2019-02-01 02:59:48,148,166,2019-02-01 02:00:00
12173,2019-02-01 02:48:07,2019-02-01 02:59:50,148,68,2019-02-01 02:00:00
11281,2019-02-01 02:53:58,2019-02-01 02:59:51,162,186,2019-02-01 02:00:00
12553,2019-02-01 02:58:09,2019-02-01 02:59:52,230,186,2019-02-01 02:00:00


In [34]:
preloaded_taxi_inflow = pre_loaded_data.drop(['tpep_pickup_datetime', 'PULocationID', 'alignedtime'], axis=1)
preloaded_taxi_outflow = pre_loaded_data.drop(['tpep_dropoff_datetime', 'DOLocationID', 'alignedtime'], axis=1)

remaining_taxi_inflow = remaining_data.drop(['tpep_pickup_datetime', 'PULocationID', 'alignedtime'], axis=1)
remaining_taxi_outflow = remaining_data.drop(['tpep_dropoff_datetime', 'DOLocationID', 'alignedtime'], axis=1)

In [35]:
preloaded_taxi_inflow['tpep_dropoff_datetime'] = preloaded_taxi_inflow['tpep_dropoff_datetime'].astype(str)
preloaded_taxi_outflow['tpep_pickup_datetime'] = preloaded_taxi_outflow['tpep_pickup_datetime'].astype(str)

remaining_taxi_inflow['tpep_dropoff_datetime'] = remaining_taxi_inflow['tpep_dropoff_datetime'].astype(str)
remaining_taxi_outflow['tpep_pickup_datetime'] = remaining_taxi_outflow['tpep_pickup_datetime'].astype(str)

  preloaded_taxi_inflow = preloaded_taxi_inflow.append(remaining_taxi_inflow.iloc[0:10])
  preloaded_taxi_outflow = preloaded_taxi_outflow.append(remaining_taxi_outflow.iloc[0:10])


In [40]:
preloaded_taxi_inflow = preloaded_taxi_inflow.rename(columns={'tpep_dropoff_datetime': 'time', 'DOLocationID': 'region_id'})
preloaded_taxi_outflow = preloaded_taxi_outflow.rename(columns={'tpep_pickup_datetime': 'time', 'PULocationID': 'region_id'})

remaining_taxi_inflow = remaining_taxi_inflow.rename(columns={'tpep_dropoff_datetime': 'time', 'DOLocationID': 'region_id'})
remaining_taxi_outflow = remaining_taxi_outflow.rename(columns={'tpep_pickup_datetime': 'time', 'PULocationID': 'region_id'})

In [53]:
preloaded_taxi_inflow
preloaded_taxi_outflow = preloaded_taxi_outflow.sort_values('time')
preloaded_taxi_outflow

# remaining_taxi_inflow
# remaining_taxi_outflow = remaining_taxi_outflow.sort_values('time')
# remaining_taxi_outflow

# Add some data out of range to ensure the tumble window finished
# preloaded_taxi_inflow = preloaded_taxi_inflow.append(remaining_taxi_inflow.iloc[0:10])
preloaded_taxi_outflow = preloaded_taxi_outflow.append(remaining_taxi_outflow.iloc[0:10])
preloaded_taxi_outflow

  preloaded_taxi_outflow = preloaded_taxi_outflow.append(remaining_taxi_outflow.iloc[0:10])


Unnamed: 0,time,region_id
2207,2019-02-01 00:00:02,68
3245,2019-02-01 00:00:03,230
1014,2019-02-01 00:00:03,229
6212,2019-02-01 00:00:04,230
6006,2019-02-01 00:00:04,186
...,...,...
3390,2019-02-01 00:31:15,87
4171,2019-02-01 00:34:58,231
6716,2019-02-01 00:35:16,158
3444,2019-02-01 00:35:47,13


In [51]:
# Import data to kafka
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Preload the data into Kafka
N = preloaded_taxi_inflow.shape[0]
for i in range(N):
    inflow_string = preloaded_taxi_inflow.iloc[i].to_json()
    outflow_string = preloaded_taxi_outflow.iloc[i].to_json()

    # To send a message
    producer.send('taxi_inflow', inflow_string.encode('utf-8'))
    producer.send('taxi_outflow', outflow_string.encode('utf-8'))

In [25]:
N = remaining_taxi_inflow.shape[0]
for i in range(N):
    inflow_string = remaining_taxi_inflow.iloc[i].to_json()
    outflow_string = remaining_taxi_outflow.iloc[i].to_json()

    # To send a message
    producer.send('taxi_inflow', inflow_string.encode('utf-8'))
    producer.send('taxi_outflow', outflow_string.encode('utf-8'))
    time.sleep(1)

producer.flush()  # Wait for any outstanding messages to be transmitted and delivery acknowledgments received
producer.close()

In [None]:
-- Problem encountered with Flink SQL
-- 1. Missing Kafka Connector: Download the jar and move it to lib folder of flink
-- 2. SQL client get stuck: The flink service is not opened
-- 3. Py4JJavaError: The version of Java matters 

CREATE TABLE taxi_inflow (
	`time` TIMESTAMP(3), 
	`region_id` BIGINT, 
    WATERMARK FOR `time` AS `time`
)WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = 'taxi_inflow',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format' = 'json'  -- the data format is json
);

CREATE TABLE taxi_outflow (
	`time` TIMESTAMP(3), 
	`region_id` BIGINT, 
    WATERMARK FOR `time` AS `time`
)WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = 'taxi_outflow',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format' = 'json'  -- the data format is json
);

In [None]:
-- Elasticsearch
CREATE TABLE taxi_inflow_es (
    interval_start TIMESTAMP, 
	`region_id` BIGINT, 
    inflow BIGINT
) WITH (
    'connector' = 'elasticsearch-7', -- using elasticsearch connector
    'hosts' = 'https://demo0.es.asia-east1.gcp.elastic-cloud.com:9243',  -- elasticsearch address
    'username' = 'elastic',
    'password' = '7FIVOml6LcwBxoN6cptivrCp',
    'index' = 'taxi_inflow_es'  -- elasticsearch index name, similar to database table name
);

INSERT INTO taxi_inflow_es
(SELECT TUMBLE_START(`time`, INTERVAL '5' MINUTE) AS interval_start, 
  `region_id`, 
  COUNT(*) AS inflow 
FROM taxi_inflow 
GROUP BY TUMBLE(`time`, INTERVAL '5' MINUTE), `region_id`);

CREATE TABLE taxi_outflow_es (
    interval_start TIMESTAMP, 
	`region_id` BIGINT, 
    outflow BIGINT
) WITH (
    'connector' = 'elasticsearch-7', -- using elasticsearch connector
    'hosts' = 'https://demo0.es.asia-east1.gcp.elastic-cloud.com:9243',  -- elasticsearch address
    'username' = 'elastic',
    'password' = '7FIVOml6LcwBxoN6cptivrCp',
    'index' = 'taxi_outflow_es'  -- elasticsearch index name, similar to database table name
);

INSERT INTO taxi_outflow_es
(SELECT TUMBLE_START(`time`, INTERVAL '5' MINUTE) AS interval_start, 
  `region_id`, 
  COUNT(*) AS outflow 
FROM taxi_outflow 
GROUP BY TUMBLE(`time`, INTERVAL '5' MINUTE), `region_id`);