In [1]:
!pip install kafka-python



In [2]:
from kafka import KafkaProducer
from kafka import KafkaConsumer 
from json import dumps
import json
import pandas as pd
import time
import datetime
import threading

In [3]:
#버스 위치 정보가 저장된 파일 읽기
location_df = pd.read_csv("location.csv")

In [4]:
#위치 정보 조회
location_df

Unnamed: 0,id,date,route_id,vh_id,route_nm,now_latitude,now_longitude,now_station,now_arrive_time,distance,next_station,next_latitude,next_longitude
0,210457,2019-10-29,405136001,7997025,360-1,33.457724,126.554014,제대마을,07시,333.0,제대아파트,33.458783,126.557353
1,210458,2019-10-29,405136001,7997025,360-1,33.458783,126.557353,제대아파트,07시,415.0,제주대학교,33.459893,126.561624
2,210459,2019-10-29,405136001,7997025,360-1,33.478867,126.483833,한라중학교/부영아파트,08시,417.0,대림2차아파트,33.478850,126.488350
3,210460,2019-10-29,405136001,7997025,360-1,33.478850,126.488350,대림2차아파트,08시,334.0,연동대림1차아파트,33.480700,126.489933
4,210461,2019-10-29,405136001,7997025,360-1,33.480700,126.489933,연동대림1차아파트,08시,550.0,케이티앤지,33.482077,126.485355
...,...,...,...,...,...,...,...,...,...,...,...,...,...
91769,302350,2019-11-05,405328102,7983486,281-2,33.493625,126.534764,제주지방법원(광양방면),20시,272.0,고산동산(광양방면),33.495540,126.532907
91770,302351,2019-11-05,405328102,7983486,281-2,33.495540,126.532907,고산동산(광양방면),20시,447.0,제주시청(광양방면),33.498925,126.530351
91771,302352,2019-11-05,405328102,7983486,281-2,33.498925,126.530351,제주시청(광양방면),20시,418.0,광양사거리,33.500473,126.527103
91772,302353,2019-11-05,405328102,7983486,281-2,33.500473,126.527103,광양사거리,20시,140.0,탐라장애인 종합복지관,33.500228,126.525625


In [5]:
#0 번째 위치 정보 조회
location_df.loc[0]

id                     210457
date               2019-10-29
route_id            405136001
vh_id                 7997025
route_nm                360-1
now_latitude        33.457724
now_longitude      126.554014
now_station              제대마을
now_arrive_time           07시
distance                333.0
next_station            제대아파트
next_latitude       33.458783
next_longitude     126.557353
Name: 0, dtype: object

In [6]:
# location_df.loc[0] : 0 번째 위치 정보 조회
# to_json() : 0번째 위치 정보를 JSON 으로 변환
# force_ascii=False : ascii 형태로 변환 하지 않음
location_df.loc[0].to_json(force_ascii=False)

'{"id":210457,"date":"2019-10-29","route_id":405136001,"vh_id":7997025,"route_nm":"360-1","now_latitude":33.457724,"now_longitude":126.554014,"now_station":"제대마을","now_arrive_time":"07시","distance":333.0,"next_station":"제대아파트","next_latitude":33.458783,"next_longitude":126.557353}'

In [7]:
#Kafka로 버스 현재 위치를 전송하는객체
bus_producer=KafkaProducer(
            #메시지 받은 사람이 메시지를 잘 받았는지 체크하는 옵션 
            #(0은 그냥 보내기만 한다. 확인x)
            acks=0, 
            #메시지 전달할 때 압축
            compression_type='gzip', 
            #현재 위치를 저장할 카프카 아이피
            bootstrap_servers=['192.168.56.101:9092'], 
            #직렬화 : 데이터 전송을 위해 byte단위로 바꿔주는 작업 : 
            #dumps 함수이용. dump : json 값을 메모리에 올려준다.
            #encode를 통해서 올린다.x가 있으면, x를 dumps로 바꾸고
            #utf-8 로 설정한다
            value_serializer=lambda x: dumps(x).encode('utf-8') 
          )

In [8]:
#인공지능 서버에서 버스 도착예측 시간을 가져오는 객체
bus_consumer=KafkaConsumer( 
                        #읽어올 토픽의 이름  
                        "car_arrive_topic",
                        # 카프카 서버 아이피:포트
                          bootstrap_servers=['192.168.56.101:9092'], 
                        # 어디서부터 값을 읽어올지 설정 (earlest 가장 처음 latest는 가장 최근)
                        auto_offset_reset="latest",
                        # 완료되었을 떄 문자 전송
                        enable_auto_commit=True, 
                        # 그룹핑하여 토픽 지정할 수 있다 > 같은 컨슈머로 작업
                        group_id='my-group', 
                        # 역직렬화 ( 받을 떄 ) ; 메모리에서 읽어오므로 loads라는 함수를 이용한다. // 직렬화 (보낼 떄)
                        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                        # 10000000 밀리초 이후에 메시지가 오지 않으면 없는 것으로 취급.
                        consumer_timeout_ms=10000000
            )

In [9]:
#len(location_df) : 버스 도착정보의 개수
#버스 도착 정보의 개수를 location_count에 저장
location_count = len(location_df)
location_count

91774

In [10]:
#버스 위치 정보를 인공지능 서버로 전송하는 함수
def send_location() :
    #전송할 데이터의 인덱스
    index = 0
    
    while True:
        # location_count : 버스 도착정보의 개수
        # index : 전송할 데이터의 인덱스
        # 전송할 데이터의 개수가 버스 도착정보 개수 이상
        if index >= location_count :
            #인덱스 초기화
            index = 0
        #datetime.datetime.now() : 현재 시간 리턴    
        now = datetime.datetime.now()
        #location_df.loc[index] : index 번째 위치정보를 리턴
        location_row = location_df.loc[index]
        # location_row :index 번째 위치 정보
        # to_json() : 0번째 위치 정보를 JSON 으로 변환
        # force_ascii=False : ascii 형태로 변환 하지 않음
        data = location_row.to_json(force_ascii=False)
        #data를 JSON 형태 객체로 변환
        data = json.loads(data)
        # now.hour : 현재 시간을 리턴
        #현재 시간(now.hour)를 data에 hour 로 저장
        data["hour"] = now.hour
        #now.weekday() : 현재 요일 
        #현재 요일(now.weekday())를 data에 weekday 로 저장
        data["weekday"] = now.weekday()
        #car_location_topic으로 data 를 전송할 준비
        bus_producer.send('car_location_topic',value=data)
        #Kafka로 데이터 전송
        bus_producer.flush()
        #전송한 데이터 출력
        print("send location data =", data)
        #30초 대기
        time.sleep(30)
        #index 1증가
        index = index + 1
        print("=" * 100)

In [11]:
#인공지능 서버에서 도착 예정 시간을 출력하는 함수
def get_arrive():

    #bus_consumer : 인공지능 서버에서 전송한 
    #               버스 도착 예정 시간을 가져오는 객체
    
    # 가져온 버스 도착 예정 시간은 arrive에 저장
    for arrive in bus_consumer:
        print("get_arrive : arrive =",arrive)
        print("=" * 100)

In [None]:
#send_location 버스 현재 위치를 인공지능 서버로 전송 함수
#threading.Thread: send_location 함수를 멀티 쓰레드로 실행 할 객체
t1 = threading.Thread(target=send_location)
#get_arrive : 인공지능 서버의 도착 시간을 가져오는 함수
#threading.Thread: get_arrive 함수를 멀티 쓰레드로 실행 할 객체
t2 = threading.Thread(target=get_arrive)
#멀티 쓰레드 시작
t2.start()
#멀티 쓰레드 시작
t1.start()
#멀티 쓰레드 종료 할때까지 대기
t2.join()
t1.join()


send location data = {'id': 210457, 'date': '2019-10-29', 'route_id': 405136001, 'vh_id': 7997025, 'route_nm': '360-1', 'now_latitude': 33.457724, 'now_longitude': 126.554014, 'now_station': '제대마을', 'now_arrive_time': '07시', 'distance': 333.0, 'next_station': '제대아파트', 'next_latitude': 33.458783, 'next_longitude': 126.557353, 'hour': 16, 'weekday': 4}
get_arrive : arrive = ConsumerRecord(topic='car_arrive_topic', partition=0, offset=24, timestamp=1688106152896, timestamp_type=0, key=None, value={'id': 210577, 'date': '2019-10-29', 'route_id': 405136001, 'vh_id': 7997025, 'route_nm': '360-1', 'now_latitude': 33.485662, 'now_longitude': 126.494923, 'now_station': '도호동', 'now_arrive_time': '18시', 'distance': 321.0, 'next_station': '연동주민센터', 'next_latitude': 33.487317, 'next_longitude': 126.496617, 'hour': 14, 'weekday': 4, 'arrive_time': 89.01767764039674}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=415, serialized_header_size=-1)
get_arrive : arrive = Consume