In [None]:
%flink.pyflink
!pip install pyproj

from pyflink.table import DataTypes
from pyflink.table.udf import udf
from pyproj import Transformer
from pyflink.table.types import Row

# 좌표 변환을 위한 Transformer 초기화 (EPSG:2097 → EPSG:4326)
transformer = Transformer.from_crs("EPSG:2097", "EPSG:4326", always_xy=True)

# UDF: 좌표 변환
@udf(result_type=DataTypes.ROW([
    DataTypes.FIELD("lat", DataTypes.DOUBLE()), 
    DataTypes.FIELD("lon", DataTypes.DOUBLE())
]))
def transform_coordinates(x, y):
    try:
        lon, lat = transformer.transform(x, y)
        return Row(lat=lat, lon=lon)
    except Exception:
        return Row(lat=None, lon=None)

# UDF: 사고 유형 및 상세 유형 매핑
@udf(result_type=DataTypes.ROW([
    DataTypes.FIELD("acc_type_desc", DataTypes.STRING()),
    DataTypes.FIELD("acc_dtype_desc", DataTypes.STRING())
]))
def map_accident_types(acc_type, acc_dtype):
    acc_type_map = {
        'A01': '교통사고',
        'A02': '차량고장',
        'A03': '보행사고',
        'A04': '공사',
        'A05': '낙하물',
        'A06': '버스사고',
        'A07': '지하철사고',
        'A08': '화재',
        'A09': '기상/재난',
        'A10': '집회및행사',
        'A11': '기타',
        'A12': '제보',
        'A13': '단순정보'
    }

    acc_dtype_map = {
        '05B01': '소형낙하물', '12B01': '제보', '11B01': '기타',
        '10B01': '훈련', '02B01': '차량고장', '06B01': '버스사고',
        '04B01': '시설물보수', '01B01': '추돌사고', '09B01': '폭우',
        '08B01': '화재', '07B01': '지하철사고', '03B01': '보행사고',
        '04B02': '청소작업', '01B03': '전복사고', '10B02': '집회/시위',
        '09B02': '호우주의보', '05B02': '대형낙하물', '04B03': '차선도색',
        '10B03': '행사', '09B03': '호우경보', '01B04': '차량화재',
        '01B05': '전도사고', '09B04': '태풍주의보', '04B04': '도로보수',
        '09B05': '태풍경보', '04B05': '제설작업', '09B06': '폭설',
        '04B06': '포장공사', '09B07': '대설주의보', '04B07': '가로수정비',
        '09B08': '대설경보', '09B09': '폭염', '09B10': '폭염주의보',
        '09B11': '한파', '09B12': '한파주의보', '09B13': '우박',
        '09B14': '노면미끄러움', '09B15': '도로침하', '09B16': '도로침수',
        '09B17': '도로결빙', '09B18': '노면패임', '13B01': '단순정보',
        '09B19': '강우통제'
    }

    acc_type_desc = acc_type_map.get(acc_type, '기타')
    acc_dtype_desc = acc_dtype_map.get(acc_dtype, '기타')

    return Row(acc_type_desc=acc_type_desc, acc_dtype_desc=acc_dtype_desc)

# UDF 등록
st_env.drop_temporary_function("transform_coordinates")
st_env.create_temporary_function("transform_coordinates", transform_coordinates)

st_env.drop_temporary_function("map_accident_types")
st_env.create_temporary_function("map_accident_types", map_accident_types)


In [None]:
%flink.ssql

-- Source Table
DROP TABLE IF EXISTS accident_raw;
CREATE TABLE accident_raw (
    ACC_ID STRING,                 -- 사고 ID
    OCCR_DATE STRING,              -- 사고 발생 날짜
    OCCR_TIME STRING,              -- 사고 발생 시간
    OCCURRED_AT TIMESTAMP(3),      -- 사고 발생 날짜 + 시간 (워터마크)
    EXP_CLR_DATE STRING,           -- 예상 해제 날짜
    EXP_CLR_TIME STRING,           -- 예상 해제 시간
    CLEARED_AT TIMESTAMP(3),       -- 예상 해제 날짜 + 시간
    ACC_TYPE STRING,               -- 사고 유형
    ACC_DTYPE STRING,              -- 사고 상세 유형
    LINK_ID STRING,                -- 링크 ID
    GRS80TM_X DOUBLE,              -- GRS80 X 좌표
    GRS80TM_Y DOUBLE,              -- GRS80 Y 좌표
    ACC_INFO STRING,               -- 사고 정보
    ACC_ROAD_CODE STRING           -- 사고 도로 코드
) WITH (
  'connector' = 'kinesis',
  'stream' = 'accident',
  'aws.region' = 'ap-northeast-2',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'scan.shard.getrecords.maxretries' = '10'
);


-- Sink accident Table
DROP TABLE IF EXISTS sink_accident;
CREATE TABLE sink_accident (
    acc_id STRING,                 -- 사고 ID
    occr_date STRING,              -- 사고 발생 날짜
    occr_time STRING,              -- 사고 발생 시간
    occurred_at TIMESTAMP(3),      -- 사고 발생 날짜 + 시간 (워터마크)
    exp_clr_date STRING,           -- 예상 해제 날짜
    exp_clr_time STRING,           -- 예상 해제 시간
    cleared_at TIMESTAMP(3),       -- 예상 해제 날짜 + 시간
    acc_type STRING,               -- 사고 유형
    acc_dtype STRING,              -- 사고 상세 유형
    link_id STRING,                -- 링크 ID
    latitude DOUBLE,               -- 위도
    longitude DOUBLE,              -- 경도
    acc_info STRING                -- 사고 정보
) PARTITIONED BY (`occr_date`)
WITH (
    'connector' = 'filesystem',
    'path' = 's3://cloud9-transformed/accident_transformed',
    'format' = 'json',
    'sink.partition-commit.trigger' = 'process-time',
    'sink.partition-commit.delay' = '0 s',
    'sink.partition-commit.policy.kind' = 'success-file',
    'sink.rolling-policy.file-size' = '128 mb',
    'sink.rolling-policy.check-interval' = '3 min',
    'sink.rolling-policy.rollover-interval' = '3 min'
    
);

SET 'table.dml-sync' = 'true'; -- 동기식
SET 'execution.runtime-mode' = 'streaming'; -- 스트리밍 모드
SET execution.checkpointing.interval = 100000; -- 10초
-- SET 'pipeline.name' = 'Job1 - Sink Accident';


INSERT INTO sink_accident
SELECT 
    ACC_ID,
    OCCR_DATE,
    OCCR_TIME,
    OCCURRED_AT,
    EXP_CLR_DATE,
    EXP_CLR_TIME,
    CLEARED_AT,
    map_accident_types(ACC_TYPE, ACC_DTYPE).acc_type_desc AS ACC_TYPE_DESC,
    map_accident_types(ACC_TYPE, ACC_DTYPE).acc_dtype_desc AS ACC_DTYPE_DESC,
    LINK_ID,
    transform_coordinates(GRS80TM_X, GRS80TM_Y).lat AS latitude,
    transform_coordinates(GRS80TM_X, GRS80TM_Y).lon AS longitude,
    ACC_INFO
FROM 
    accident_raw;


In [None]:
%flink.ssql

DROP TABLE IF EXISTS accident_tumbled;
CREATE TABLE accident_tumbled (
    ACC_ID STRING,                 -- 사고 ID
    OCCR_DATE STRING,              -- 사고 발생 날짜
    OCCR_TIME STRING,              -- 사고 발생 시간
    OCCURRED_AT TIMESTAMP(3),      -- 사고 발생 날짜 + 시간 (워터마크)
    EXP_CLR_DATE STRING,           -- 예상 해제 날짜
    EXP_CLR_TIME STRING,           -- 예상 해제 시간
    CLEARED_AT TIMESTAMP(3),       -- 예상 해제 날짜 + 시간
    ACC_TYPE STRING,               -- 사고 유형
    ACC_DTYPE STRING,              -- 사고 상세 유형
    LINK_ID STRING,                -- 링크 ID
    GRS80TM_X DOUBLE,              -- GRS80 X 좌표
    GRS80TM_Y DOUBLE,              -- GRS80 Y 좌표
    ACC_INFO STRING,               -- 사고 정보
    ACC_ROAD_CODE STRING,          -- 사고 도로 코드
    WATERMARK FOR `OCCURRED_AT` AS `OCCURRED_AT` - INTERVAL '5' MINUTE
) WITH (
  'connector' = 'kinesis',
  'stream' = 'accident',
  'aws.region' = 'ap-northeast-2',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'scan.shard.getrecords.maxretries' = '10'
);

-- Sink sns table
DROP TABLE IF EXISTS accident_to_sns;
CREATE TABLE accident_to_sns (
    window_start TIMESTAMP(3),                 
    window_end TIMESTAMP(3),              
    acc_type STRING,               
    acc_dtype STRING,             
    acc_cts BIGINT
) WITH (
  'connector' = 'kinesis',
  'stream' = 'accident-to-sns',
  'aws.region' = 'ap-northeast-2',
  'format' = 'json'
);

SET 'table.dml-sync' = 'true'; -- 동기식
SET 'execution.runtime-mode' = 'streaming'; -- 스트리밍 모드
SET execution.checkpointing.interval = 100000; -- 100초

-- SNS
INSERT INTO accident_to_sns
SELECT
    TUMBLE_START(OCCURRED_AT, INTERVAL '5' MINUTE) AS window_start,
    TUMBLE_END(OCCURRED_AT, INTERVAL '5' MINUTE) AS window_end,
    map_accident_types(ACC_TYPE, ACC_DTYPE).acc_type_desc AS acc_type,
    map_accident_types(ACC_TYPE, ACC_DTYPE).acc_dtype_desc AS acc_dtype,
    COUNT(DISTINCT ACC_ID) AS acc_cts
FROM accident_tumbled
GROUP BY
    TUMBLE(OCCURRED_AT, INTERVAL '5' MINUTE),
    map_accident_types(ACC_TYPE, ACC_DTYPE).acc_type_desc,
    map_accident_types(ACC_TYPE, ACC_DTYPE).acc_dtype_desc
HAVING COUNT(DISTINCT ACC_ID) >= 2;