# `Spark model`, 버스 도착 시각 예측 데이터 `MySQL` 에 저장

### 이전 `ai_spark_bus_arrive_prediction` 은 `Kafka topic`, `car_arrive_topic` 에 저장하였는데
### `car_location_topic` 도 사용 중이다 보니 잘 구동되지 않는 경우(컴퓨터 메모리 부족, CPU 과부하 등의 문제 발생)가 있어
### 보편적으로 잘 사용되기 위해 `MySQL` 에 데이터를 저장하는 파일도 생성한 것.

In [1]:
%sh
# pip uninstall pandas --y

In [2]:
%sh
# pip install -U pandas==1.5.3

In [3]:
%sh
# pip install ksql

In [4]:
%sh
# pip install PyMySQL

In [5]:
%sh
# pip install cffi

In [6]:
%spark.pyspark

from ksql import KSQLAPI
import pandas as pd
import time
import datetime
import numpy as np
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml.feature import VectorAssembler
import ast
import pymysql

In [7]:
%spark.pyspark

client = KSQLAPI("http://localhost:8089", timeout=None)

In [8]:
%spark.pyspark
# MySQL DB 연결

mysql_db = pymysql.connect(
        host = "192.168.0.171" # MySQL이 설치된 윈도우 IP 주소
        ,port=3306
        ,user='root'
        ,passwd='1234'
        ,db='bus_db'
        ,charset='utf8'
    )

In [9]:
%spark.pyspark

mysql_cursor = mysql_db.cursor()

In [10]:
%spark.pyspark
# 하둡에 저장된 모델 읽어서 저장

load_rf = RandomForestRegressionModel.load("/spark_rf_model/")

In [11]:
%spark.pyspark

load_rf

In [12]:
%spark.pyspark

feature_name_list = [
                    'now_latitude', 'now_longitude', 'now_arrive_time',
                    'distance','next_latitude','next_longitude','weekday'
                    ]

In [13]:
%spark.pyspark

feature_name_list

In [14]:
%spark.pyspark

get_change_table = client.query("""SELECT * FROM bus_location_topic EMIT CHANGES;""")

In [15]:
%spark.pyspark


In [16]:
%spark.pyspark


In [17]:
%spark.pyspark

for i in get_change_table:
    print(f"수정된 테이블 조회 : {i}")
    
    if "row" in i:
        location = ast.literal_eval(i)[0]["row"]["columns"]
        print(f"위치 : {location}")
        
        id = location[0]
        date = location[1]
        route_id = location[2]
        vh_id = location[3]
        route_nm = location[4]
        now_latitude = location[5]
        now_longitude = location[6]
        now_station = location[7]
        now_arrive_time = location[8]
        distance = location[9]
        next_station = location[10]
        next_latitude = location[11]
        next_longitude = location[12]
        weekday = location[13]
        
        # 예측을 위해 버스 위치 정보를 Pandas DataFrame 으로 생성
        df_location = pd.DataFrame({
            "now_latitude" : [now_latitude],
            "now_longitude" : [now_longitude],
            "now_arrive_time" : [now_arrive_time],
            "distance" : [distance],
            "next_latitude" : [next_latitude],
            "next_longitude" : [next_longitude],
            "weekday" : [weekday]
        })
        
        # Pandas DataFrame인 df_location을 Spark DataFrame 으로 변환
        # Spark RandomForest 모델에서 예측을 위해
        df_spark = spark.createDataFrame(df_location)
        
        assembler = VectorAssembler(inputCols=feature_name_list, outputCol="features")
        
        assembler_df = assembler.transform(df_spark)
        
        prediction = load_rf.transform(assembler_df)
        
        arrive_time = prediction.toPandas().loc[0,"prediction"]
        print("#"*100)

        # 학습시 예측을 로그 값으로 예측하기 때문에 np.exp(로그값) => 원래 값으로 변환 출력
        print(f"버스로 전송할 도착 예정 시간 : {np.exp(arrive_time)}")
        print("#"*100)
        
        # 다음 정류장에 도착 예정 시간을 소수 3번째 자리 반올림
        next_arrive_time = round(np.exp(arrive_time),3)
        
        # route_id : 노선아이디, vh_id : 버스 아이디, route_nm : 버스노선명
        count_query = f"""
                        select count(*) from bus_arrive_prediction
                        where route_id = '{route_id}'
                        and vh_id = '{vh_id}'
                        and route_nm = '{route_nm}'
                        """
        print(f"count_query : {count_query}")
        mysql_cursor.execute(count_query)
        
        count = mysql_cursor.fetchall()[0][0]
        print(f"count : {count}")
        
        if count < 1 : #=> 노선아이디, 버스아이디, 버스노선명이 일치하는 것이 없을 때 == 데이터가 없을 때
            
            insert_query = f"""INSERT INTO bus_arrive_prediction (
                            now_date
                            ,route_id
                            ,vh_id
                            ,route_nm
                            ,now_latitude
                            ,now_longitude
                            ,now_station
                            ,distance
                            ,next_station
                            ,next_latitude
                            ,next_longitude
                            ,next_arrive_time
                            ) VALUES ( now() ,'{route_id}' ,'{vh_id}'
                                    ,'{route_nm}',{now_latitude}
                                    ,{now_longitude},'{now_station}'
                                    ,{distance}
                                    ,'{next_station}',{next_latitude}
                                    ,{next_longitude},{next_arrive_time}
                        );"""
            print(f"insert query :\n{insert_query}")
            mysql_cursor.execute(insert_query)
            mysql_db.commit()
        
        else:
            
            update_query = f"""update bus_arrive_prediction 
                        set now_date=now()
                        ,now_latitude={now_latitude}
                        ,now_longitude={now_longitude}
                        ,now_station = '{now_station}'
                        ,distance={distance}
                        ,next_station = '{next_station}'
                        ,next_latitude={next_latitude}
                        ,next_longitude={next_longitude}
                        ,next_arrive_time = {next_arrive_time}
                        where route_id = '{route_id}'
                        and vh_id = '{vh_id}'
                        and route_nm = '{route_nm}'
                        """
            print(f"update query :\n{update_query}")
            mysql_cursor.execute(update_query)
            mysql_db.commit()                

            
    print("="*100)

In [18]:
%spark.pyspark

mysql_db.close()

In [19]:
%spark.pyspark


In [20]:
%spark.pyspark
