In [1]:
import pandas as pd
import numpy as np
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSource
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
import os
import time
from datetime import datetime

In [2]:
# 环境等设置
env = StreamExecutionEnvironment.get_execution_environment()
# env.set_parallelism(8)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", '300000')

In [3]:
# 输入表创建
t_env.connect(FileSystem().path('./data/use_yellow_tripdata_2014-01.csv')) \
    .with_format(OldCsv()
                 .field('pickup_datetime', DataTypes.STRING())
                 .field('dropoff_datetime', DataTypes.STRING())
                 .field('pickup_longitude', DataTypes.FLOAT())
                 .field('pickup_latitude', DataTypes.FLOAT())
                 .field('dropoff_longitude', DataTypes.FLOAT())
                 .field('dropoff_latitude', DataTypes.FLOAT())) \
    .with_schema(Schema()
                 .field('pickup_datetime', DataTypes.STRING())
                 .field('dropoff_datetime', DataTypes.STRING())
                 .field('pickup_longitude', DataTypes.FLOAT())
                 .field('pickup_latitude', DataTypes.FLOAT())
                 .field('dropoff_longitude', DataTypes.FLOAT())
                 .field('dropoff_latitude', DataTypes.FLOAT())) \
    .create_temporary_table('mySource')

<pyflink.table.descriptors.StreamTableDescriptor at 0x203245c94c8>

In [4]:
t_env.connect(FileSystem().path('./data/map_matching')) \
    .with_format(OldCsv()
                 .field('pickup_datetime', DataTypes.STRING())
                 .field('dropoff_datetime', DataTypes.STRING())
                 .field('pickup_longitude', DataTypes.FLOAT())
                 .field('pickup_latitude', DataTypes.FLOAT())
                 .field('dropoff_longitude', DataTypes.FLOAT())
                 .field('dropoff_latitude', DataTypes.FLOAT())
                 .field('O', DataTypes.BIGINT())
                 .field('D', DataTypes.BIGINT())
                 .field('same_od', DataTypes.BIGINT())
                 .field('duration', DataTypes.BIGINT())
                 .field('weekday', DataTypes.BIGINT())
                 .field('day', DataTypes.BIGINT())
                 .field('hour', DataTypes.BIGINT())
                 ) \
    .with_schema(Schema()
                 .field('pickup_datetime', DataTypes.STRING())
                 .field('dropoff_datetime', DataTypes.STRING())
                 .field('pickup_longitude', DataTypes.FLOAT())
                 .field('pickup_latitude', DataTypes.FLOAT())
                 .field('dropoff_longitude', DataTypes.FLOAT())
                 .field('dropoff_latitude', DataTypes.FLOAT())
                 .field('O', DataTypes.BIGINT())
                 .field('D', DataTypes.BIGINT())
                 .field('same_od', DataTypes.BIGINT())
                 .field('duration', DataTypes.BIGINT())
                 .field('weekday', DataTypes.BIGINT())
                 .field('day', DataTypes.BIGINT())
                 .field('hour', DataTypes.BIGINT())
                 ) \
    .create_temporary_table('mySink')

<pyflink.table.descriptors.StreamTableDescriptor at 0x203245f57c8>

In [5]:
# 交叉口经纬度数据读取
data = pd.read_csv('./data/nodes_lonlat.csv', header=None)
coor = data.to_numpy()
coor.shape

(4014, 2)

In [6]:
# udf编写与注册
# 坐标匹配函数,不匹配返回-1
@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
     DataTypes.ARRAY(DataTypes.FLOAT()), DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
def distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
    temp = (np.sin((lng2-lng1)/2*np.pi/180)**2 + np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
    distance = 2*np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    distance = distance*3958.8*1609.344

    buffer=100
    if (distance <= buffer).sum() > 0:
        return distance.argmin()
    else:
        return -1

t_env.register_function("distance_meters", distance_meters)

In [7]:
# 相同起终点计算函数, 相同返回1, 不相同返回0
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def is_same_od(o, d):
    return 1 if o == d else 0

t_env.register_function('is_same_od', is_same_od)

In [8]:
# 计算行程时间, 返回行程时间的秒或者-1
@udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.BIGINT())
def compute_duration_time(pickup_time, dropoff_time):
    try:
        pickup_time = datetime.strptime(pickup_time, '%Y-%m-%d %H:%M:%S')
        dropoff_time = datetime.strptime(dropoff_time, '%Y-%m-%d %H:%M:%S')
        return (dropoff_time - pickup_time).seconds
    except:
        return -1

t_env.register_function('compute_duration_time', compute_duration_time)

In [9]:
# 获取起点时间是周几、是几日、是几时
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.BIGINT())
def pickup_time_weekday(pickup_time):
    try:
        pickup_time = datetime.strptime(pickup_time, '%Y-%m-%d %H:%M:%S')
        return pickup_time.weekday()
    except:
        return -1

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.BIGINT())
def pickup_time_day(pickup_time):
    try:
        pickup_time = datetime.strptime(pickup_time, '%Y-%m-%d %H:%M:%S')
        return pickup_time.day
    except:
        return -1

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.BIGINT())
def pickup_time_hour(pickup_time):
    try:
        pickup_time = datetime.strptime(pickup_time, '%Y-%m-%d %H:%M:%S')
        return pickup_time.hour
    except:
        return -1
t_env.register_function('pickup_time_weekday', pickup_time_weekday)
t_env.register_function('pickup_time_day', pickup_time_day)
t_env.register_function('pickup_time_hour', pickup_time_hour)

In [10]:
# 处理流程
t_env.from_path('mySource') \
    .select("pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, distance_meters(pickup_longitude, pickup_latitude) as O, distance_meters(dropoff_longitude, dropoff_latitude) as D, compute_duration_time(pickup_datetime, dropoff_datetime) as duration, pickup_time_weekday(pickup_datetime) as weekday, pickup_time_day(pickup_datetime) as day, pickup_time_hour(pickup_datetime) as hour") \
    .select("pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, O, D, is_same_od(O, D) as same_od, duration, weekday, day, hour") \
    .insert_into('mySink')

In [11]:
# 执行与计时
start_time = time.time()
t_env.execute("job")
compute_time = time.time() - start_time
print(compute_time, compute_time / 60)

807.1666331291199 13.452777218818664
