<h1>MySQL 저장

<h5> csv파일 병합 => 데이터가 많지 않아서 spark 보다 pandas로 로드하는 것이 유리

In [None]:
import os
import pandas as pd
import chardet
import warnings
from pandas.errors import ParserWarning

warnings.simplefilter(action='ignore', category=ParserWarning)

folder_path = "CsvFile"
csv_files = [f for f in os.listdir(folder_path) if f.endswith(".csv")]

expected_columns = ["사용일자", "노선명", "지하철역", "승차총승객수", "하차총승객수"]
df_list = []

for file in csv_files:
    file_path = os.path.join(folder_path, file)

    try:
        # 1. 인코딩 자동 감지
        with open(file_path, 'rb') as f:
            raw_data = f.read(10000)
            encoding_detected = chardet.detect(raw_data)['encoding']
        
        # 2. 감지된 인코딩으로 CSV 읽기
        df = pd.read_csv(
            file_path,
            encoding=encoding_detected,
            names=expected_columns,
            header=0,
            index_col=False, 
            on_bad_lines='skip'  # 문제 있는 줄 건너뛰기
        )
        
        df_list.append(df)

    except Exception as e:
        print(f"오류 발생: {file_path} - {e}")

df_pd = pd.concat(df_list, ignore_index=True)
df_pd.head()

Unnamed: 0,사용일자,노선명,지하철역,승차총승객수,하차총승객수
0,20150101,2호선,낙성대,14586.0,14889.0
1,20150101,2호선,사당,19233.0,20298.0
2,20150101,2호선,방배,5920.0,6065.0
3,20150101,2호선,서초,4379.0,4120.0
4,20150101,분당선,선정릉,1972.0,1828.0


In [2]:
df_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2109398 entries, 0 to 2109397
Data columns (total 5 columns):
 #   Column  Dtype  
---  ------  -----  
 0   사용일자    int64  
 1   노선명     object 
 2   지하철역    object 
 3   승차총승객수  float64
 4   하차총승객수  float64
dtypes: float64(2), int64(1), object(2)
memory usage: 80.5+ MB


<h3> Spark로 데이터 전처리

In [3]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [17]:
os.environ['PYSPARK_PYTHON'] = r"C:\Python39\python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = r"C:\Python39\python.exe"

jar_path = os.path.abspath("C:/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar").replace("\\", "/")

spark = SparkSession.builder \
    .appName("MySQL Export") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.python.worker.memory", "2g") \
    .config("spark.local.ip", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.python.worker.reuse", "true") \
    .config("spark.jars", f"file:///{jar_path}") \
    .getOrCreate()


spark_df = spark.createDataFrame(df_pd)

In [None]:
print(spark.sparkContext._jsc.sc().listJars())

Vector(spark://127.0.0.1:62468/jars/mysql-connector-j-8.3.0.jar)


In [19]:
spark_df.summary()

DataFrame[summary: string, 사용일자: string, 노선명: string, 지하철역: string, 승차총승객수: string, 하차총승객수: string]

In [20]:
spark_df = spark_df.withColumn('사용일자', to_date(col('사용일자'), 'yyyyMMdd'))
spark_df = spark_df.withColumn('승차총승객수', col('승차총승객수').cast('integer'))
spark_df = spark_df.withColumn('하차총승객수', col('하차총승객수').cast('integer'))

spark_df = spark_df.toDF('date', 'line', 'station', 'riding', 'dropped')


In [21]:
spark_df = spark_df.select("line", "station", "riding", "dropped", "date")

In [22]:
spark_df.printSchema()

root
 |-- line: string (nullable = true)
 |-- station: string (nullable = true)
 |-- riding: integer (nullable = true)
 |-- dropped: integer (nullable = true)
 |-- date: date (nullable = true)



In [23]:
spark_df.show(10)

+------+-------+------+-------+----------+
|  line|station|riding|dropped|      date|
+------+-------+------+-------+----------+
| 2호선| 낙성대| 14586|  14889|2015-01-01|
| 2호선|   사당| 19233|  20298|2015-01-01|
| 2호선|   방배|  5920|   6065|2015-01-01|
| 2호선|   서초|  4379|   4120|2015-01-01|
|분당선| 선정릉|  1972|   1828|2015-01-01|
|분당선| 가천대|  2512|   3332|2015-01-01|
|분당선|   태평|  7637|   7899|2015-01-01|
|분당선|   모란| 11155|  11269|2015-01-01|
|분당선|   야탑| 13041|  13379|2015-01-01|
|분당선|   서현| 12399|  12379|2015-01-01|
+------+-------+------+-------+----------+
only showing top 10 rows



<h5> 데이터 저장

In [26]:
jdbc_url = "jdbc:mysql://localhost:3306/seoulsubway"
table_name = "information"
properties = {
    "user": "root",
    "password": "",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# repartition 적절히 조절
spark_df = spark_df.repartition(4)

# MySQL JDBC 드라이버 경로는 Spark 세션 생성 시 반드시 포함되어 있어야 함

spark_df.write.format("jdbc").options(
    url=jdbc_url,
    driver="com.mysql.cj.jdbc.Driver",
    dbtable=table_name,
    user="root",
    password="",
    batchSize="1000"
).mode("append").save()


In [31]:
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)
df.createOrReplaceTempView("information_view")

result = spark.sql("""
    select *
    from information_view
    where line = '2호선'""")

result.show(10)

+-----+----------+------+-------+----------+
| line|   station|riding|dropped|      date|
+-----+----------+------+-------+----------+
|2호선|      신림|  1000|   2000|2015-02-02|
|2호선|      사당| 52581|  57270|2015-05-20|
|2호선|      사당| 48177|  55576|2015-03-28|
|2호선|  건대입구| 36312|  37886|2015-01-18|
|2호선|      삼성| 60477|  61752|2015-08-05|
|2호선|신정네거리| 12739|  12650|2015-10-15|
|2호선|      잠실| 65187|  57843|2015-10-25|
|2호선|신정네거리|  7232|   7140|2015-10-11|
|2호선|    낙성대| 36059|  35395|2015-04-02|
|2호선|      서초| 25457|  26631|2015-07-02|
+-----+----------+------+-------+----------+
only showing top 10 rows



In [1]:
spark.stop()

NameError: name 'spark' is not defined