In [1]:
'''

### time line ###

1. 카프카 도커 이미지 생성
docker pull encore0906/spark_kafka_env01

2. 카프카 도커 이미지 런

./spark_kafka_start.sh

3. kafka 토픽 생성
kafka-topics --create --topic car_location_topic --bootstrap-server localhost:9092 &

4. 토픽 리스트 확인
kafka-topics --bootstrap-server localhost:9092 --list

5. 카프카 디비 시작
ksql -- http://localhost:8089

6. 생성된 토픽 확인 (ksql 환경)
ksql> show topics;

7. 확인 후
ksql> exit()

8. 토픽에 저장할 데이터 생성(ksql 환경)
ksql> CREATE STREAM car_location_topic (
    id BIGINT KEY,
    date STRING,
    route_id VARCHAR,
    vh_id VARCHAR,
    route_nm VARCHAR,
    now_latitude DECIMAL(10,6),
    now_longitude DECIMAL(10,6),
    now_station VARCHAR,
    now_arrive_time DECIMAL(2,0),
    distance DECIMAL(10,1),
    next_station VARCHAR,
    next_latitude DECIMAL(10,6),
    next_longitude DECIMAL(10,6),
    weekday DECIMAL(1,0)
) WITH (
  KAFKA_TOPIC = 'car_location_topic',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
);


9. 첫번째 데이터부터 조회하도록 설정 (ksql 환경)
ksql> set 'auto.offset.reset' = 'earliest';

10. 불러오는 데이터를 확인하는 상태 (ksql 환경)
ksql> select * from car_location_topic
> emit changes;

11. jupyter notebook으로 실시간 데이터 전송 상태 확인


'''



"\n\n### time line ###\n\n1. 카프카 도커 이미지 생성\ndocker pull encore0906/spark_kafka_env01\n\n2. 카프카 도커 이미지 런\n\n./spark_kafka_start.sh\n\n3. kafka 토픽 생성\nkafka-topics --create --topic car_location_topic --bootstrap-server localhost:9092 &\n\n4. 토픽 리스트 확인\nkafka-topics --bootstrap-server localhost:9092 --list\n\n5. 카프카 디비 시작\nksql -- http://localhost:8089\n\n6. 생성된 토픽 확인 (ksql 환경)\nksql> show topics;\n\n7. 확인 후\nksql> exit()\n\n8. 토픽에 저장할 데이터 생성(ksql 환경)\nksql> CREATE STREAM car_location_topic (\n    id BIGINT KEY,\n    date STRING,\n    route_id VARCHAR,\n    vh_id VARCHAR,\n    route_nm VARCHAR,\n    now_latitude DECIMAL(10,6),\n    now_longitude DECIMAL(10,6),\n    now_station VARCHAR,\n    now_arrive_time DECIMAL(2,0),\n    distance DECIMAL(10,1),\n    next_station VARCHAR,\n    next_latitude DECIMAL(10,6),\n    next_longitude DECIMAL(10,6),\n    weekday DECIMAL(1,0)\n) WITH (\n  KAFKA_TOPIC = 'car_location_topic',\n  VALUE_FORMAT = 'JSON',\n  PARTITIONS = 1\n);\n\n\n9. 첫번째 데이터부터 조회하도록 설

In [2]:
# !pip install pandas

In [3]:
# !pip install ksql

In [4]:
import logging

from ksql import KSQLAPI
import pandas as pd
import time
import datetime
import random

In [5]:
client = KSQLAPI('http://localhost:8089', timeout=None)


In [6]:
result = client.ksql('show topics;')

In [7]:
result

[{'@type': 'kafka_topics',
  'statementText': 'show topics;',
  'topics': [{'name': 'car_location_topic', 'replicaInfo': [1]},
   {'name': 'default_ksql_processing_log', 'replicaInfo': [1]}],

In [8]:
location_df = pd.read_csv('location.csv')

In [9]:
location_df

Unnamed: 0,id,date,route_id,vh_id,route_nm,now_latitude,now_longitude,now_station,now_arrive_time,distance,next_station,next_latitude,next_longitude
0,210457,2019-10-29,405136001,7997025,360-1,33.457724,126.554014,제대마을,07시,333.0,제대아파트,33.458783,126.557353
1,210458,2019-10-29,405136001,7997025,360-1,33.458783,126.557353,제대아파트,07시,415.0,제주대학교,33.459893,126.561624
2,210459,2019-10-29,405136001,7997025,360-1,33.478867,126.483833,한라중학교/부영아파트,08시,417.0,대림2차아파트,33.478850,126.488350
3,210460,2019-10-29,405136001,7997025,360-1,33.478850,126.488350,대림2차아파트,08시,334.0,연동대림1차아파트,33.480700,126.489933
4,210461,2019-10-29,405136001,7997025,360-1,33.480700,126.489933,연동대림1차아파트,08시,550.0,케이티앤지,33.482077,126.485355
...,...,...,...,...,...,...,...,...,...,...,...,...,...
91769,302350,2019-11-05,405328102,7983486,281-2,33.493625,126.534764,제주지방법원(광양방면),20시,272.0,고산동산(광양방면),33.495540,126.532907
91770,302351,2019-11-05,405328102,7983486,281-2,33.495540,126.532907,고산동산(광양방면),20시,447.0,제주시청(광양방면),33.498925,126.530351
91771,302352,2019-11-05,405328102,7983486,281-2,33.498925,126.530351,제주시청(광양방면),20시,418.0,광양사거리,33.500473,126.527103
91772,302353,2019-11-05,405328102,7983486,281-2,33.500473,126.527103,광양사거리,20시,140.0,탐라장애인 종합복지관,33.500228,126.525625


In [None]:
location_df.sort_values(['date', 'now_arrive_time'], inplace=True)

In [None]:
location_df.reset_index(drop=True, inplace=True)

In [10]:
location_df.loc[0]

id                     210457
date               2019-10-29
route_id            405136001
vh_id                 7997025
route_nm                360-1
now_latitude        33.457724
now_longitude      126.554014
now_station              제대마을
now_arrive_time           07시
distance                333.0
next_station            제대아파트
next_latitude       33.458783
next_longitude     126.557353
Name: 0, dtype: object

In [11]:
count = len(location_df)
count

91774

In [12]:
index = 0

while True:
    if index >= count:
        index =0
    now = datetime.datetime.now()
    print('now = ', now)

    date = now.strftime('%Y-%m-%d')
    print('date = ', date)

    now_arrive_time = now.hour
    print('now_arrive_time =', now_arrive_time)


    weekday = now.weekday()
    print('weekday = ', weekday)

    location_row = location_df.loc[index]
    print('location_row = ', location_row)

    route_id = location_df.loc[index, 'route_id']
    print('route_id = ', route_id)

    vh_id = location_df.loc[index, 'vh_id']
    print('vh_id = ', vh_id)

    route_nm = location_df.loc[index, 'route_nm']
    print('route_nm = ', route_nm)

    now_latitude = round(location_df.loc[index, 'now_latitude'],6)
    print('now_latitude = ', now_latitude)

    now_longitude = round( location_df.loc[index, 'now_longitude'], 6)
    print('now_longitude = ', now_longitude)

    now_station = location_df.loc[index, 'now_station']
    print('now_station = ', now_station)

    distance = location_df.loc[index, 'distance']
    print('distance = ', distance)

    next_latitude = round(location_df.loc[index, 'next_latitude'], 6)
    print('next_latitude = ', next_latitude)

    next_longitude = round(location_df.loc[index, 'next_longitude'], 6)
    print('next_longitude = ', next_longitude)

    next_station = location_df.loc[index, 'next_station']
    print('next_station = ', next_station)


    id = int(now.timestamp())
    print('id = ', id)

    insert_query = f"""INSERT INTO car_location_topic (
                    id,
                    route_id,
                    vh_id,
                    route_nm,
                    now_latitude,
                    now_longitude,
                    now_station,
                    now_arrive_time,
                    distance,
                    next_station,
                    next_latitude,
                    next_longitude,
                    weekday,
                    date
                    ) VALUES ( {id}, '{route_id}', '{vh_id}',
                              '{route_nm}', {now_latitude},
                               {now_longitude}, '{now_station}',
                               {now_arrive_time}, {distance},
                              '{next_station}', {next_latitude},
                               {next_longitude}, {weekday},
                              '{date}' );"""
    
    print('insert_query = ', insert_query)

    try:
        client.ksql(insert_query)
    except Exception as e:
        print('it is okay , exception =', e)

    sleep_time = random.randrange(120,600)
    print('sleep_time = ', sleep_time)
    time.sleep(sleep_time)

    index = index + 1
    print('='*100)

now =  2023-10-16 16:18:06.965329
date =  2023-10-16
now_arrive_time = 16
weekday =  0
location_row =  id                     210457
date               2019-10-29
route_id            405136001
vh_id                 7997025
route_nm                360-1
now_latitude        33.457724
now_longitude      126.554014
now_station              제대마을
now_arrive_time           07시
distance                333.0
next_station            제대아파트
next_latitude       33.458783
next_longitude     126.557353
Name: 0, dtype: object
route_id =  405136001
vh_id =  7997025
route_nm =  360-1
now_latitude =  33.457724
now_longitude =  126.554014
now_station =  제대마을
distance =  333.0
next_latitude =  33.458783
next_longitude =  126.557353
next_station =  제대아파트
id =  1697440686
insert_query =  INSERT INTO car_location_topic (
                    id,
                    route_id,
                    vh_id,
                    route_nm,
                    now_latitude,
                    now_longitude,
           

KeyboardInterrupt: 