In [None]:
# pyspark와 연동(이유: 데이터의 수가 엄청 많음)
import requests
import secret
from bs4 import BeautifulSoup
from pprint import pprint
import xmltodict

from datetime import datetime, date, timedelta
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

def create_spark_session():
    return SparkSession.builder.master("local[*]").config("spark.driver.extraClassPath","C:/spark/spark-3.1.2-bin-hadoop2.7/jars/mysql-connector-java-8.0.28").appName("pyspark").getOrCreate()

# 1. xml데이터 파싱
def parse_xml_data(xml_content):
    xml_obj = xmltodict.parse(xml_content)
    charger_stations = xml_obj['SeoulRtd.citydata']['CITYDATA']['CHARGER_STTS']['CHARGER_STTS']
    return charger_stations

# 2. 충전소 및 충전기 스키마 정의 -> 데이터프레임 변환
def convert_to_dataframe(charger_stations):
    # 충전소 스키마
    station_schema = StructType([
        StructField("STAT_NM", StringType(), True), # 전기차충전소명
        StructField("STAT_ID", StringType(), True), # 전기차충전소ID
        StructField("STAT_ADDR", StringType(), True), #전기차충전소주소
        StructField("STAT_X", DoubleType(), True), #전기차충전소X좌표(경도)
        StructField("STAT_Y", DoubleType(), True), #전기차충전소Y좌표(위도)
        StructField("STAT_USETIME", StringType(), True), #전기차충전소 운영시간
        StructField("STAT_PARKPAY", StringType(), True), #전기차충전소 주차료 유무료 여부
        StructField("STAT_LIMITYN", StringType(), True), # 전기차충전소 이용자 제한
        StructField("STAT_LIMITDETAIL", StringType(), True), #전기차충전소 이용제한 사유
        StructField("STAT_KINDDETAIL", StringType(), True) #전기차충전소 상세유형
        ])
    
    # 충전기 스키마
    charger_schema = StringType([
        StructField("STAT_ID", StringType(), True), # 전기차충전소ID
        StructField("CHARGER_ID", StringType(), True), # 충전기 ID
        StructField("CHARGER_TYPE", StringType(), True), # 충전기 타입
        StructField("CHARGER_STAT", StringType(), True), # 충전기 상태
        StructField("STATUPDDT", StringType(), True), # 충전기 상태 갱신일시
        StructField("LASTTSDT", StringType(), True), # 충전기 마지막 충전시작일시
        StructField("LASTTEDT", StringType(), True), # 충전기 마지막 충전종료일시
        StructField("NOWTSDT", StringType(), True), # 충전기 충전중 시작일시
        StructField("OUTPUT", StringType(), True), # 충전기 충전용량
        StructField("METHOD", StringType(), True) # 충전기 충전방식
    ])
    
    station_data = []
    charger_data = []

    for station in charger_stations:
        station_info = (
            station['STAT_NM'], station['STAT_ID'], station['STAT_ADDR'],
            station['STAT_X'], station['STAT_Y'], station['STAT_USETIME'],
            station['STAT_PARKPAY'], station['STAT_LIMITYN'],
            station['STAT_LIMITDETAIL'], station['STAT_KINDDETAIL']
        )
        station_data.append(station_info)

        charger_details = station['CHARGER_DETAIL']['CHARGER_DETAIL']
        for charger_detail in charger_details:
            try:
                charger_info = (
                    station['STAT_ID'],
                    charger_detail['CHARGER_ID'], charger_detail['CHARGER_TYPE'],
                    charger_detail['CHARGER_STAT'], charger_detail['STATUPDDT'],
                    charger_detail['LASTTSDT'], charger_detail['LASTTEDT'],
                    charger_detail['NOWTSDT'], charger_detail['OUTPUT'],
                    charger_detail['METHOD']
                )
                charger_data.append(charger_info)
            except TypeError:
                print("충전기 정보가 없습니다.")
                continue
            
    spark = create_spark_session("ChargerDataConversion")
    station_df = spark.createDataFrame(station_data, schema=station_schema)
    charger_df = spark.createDataFrame(charger_data, schema=charger_schema)
    
    return station_df, charger_df

# 3. 해당 데이터프레임 SQL에 삽입



# 4. 데이터 가공해서 SQL에 삽입



# 0. Main
def main():
    KEY = secret.get_api_key()
    STARTNUM = 1
    ENDNUM = 3
    url = f"http://openapi.seoul.go.kr:8088/{KEY}/xml/citydata/{STARTNUM}/{ENDNUM}/광화문·덕수궁"
    res = requests.get(url=url)
    content = res.text
    charger_stations = parse_xml_data(content)
    
    station_df, charger_df = convert_to_dataframe(charger_stations)
    
    # 데이터프레임 출력
    station_df.show()
    charger_df.show()
    
    # SparkSession 종료
    spark.stop()

if __name__ == "__main__":
    main()
    






