In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, expr, from_json, schema_of_json, to_json
import config
from config import DB_CONFIG, HDFS_CONFIG

# MySQL JDBC 드라이버 경로
mysql_driver_path = config.MYSQL_JDBC

# SparkSession 생성
spark = SparkSession.builder \
    .appName("HDFS_JSON_Car_Driving") \
    .config("spark.hadoop.fs.defaultFS", HDFS_CONFIG["defaultFS"]) \
    .config("spark.jars", mysql_driver_path) \
    .getOrCreate()

25/02/27 14:19:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/27 14:19:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/02/27 14:19:08 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
# HDFS에서 JSON 파일 읽기
file_path = f"{config.HDFS_BASE_PATH}/label_data/1.Car/2.siren_of_car"
df = spark.read.json(file_path, multiLine=True)

                                                                                

In [3]:
df.count()

                                                                                

1990

In [4]:
df.printSchema()

root
 |-- annotations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- area: struct (nullable = true)
 |    |    |    |-- end: double (nullable = true)
 |    |    |    |-- start: double (nullable = true)
 |    |    |-- categories: struct (nullable = true)
 |    |    |    |-- category_01: string (nullable = true)
 |    |    |    |-- category_02: string (nullable = true)
 |    |    |    |-- category_03: string (nullable = true)
 |    |    |-- decibel: long (nullable = true)
 |    |    |-- labelName: string (nullable = true)
 |    |    |-- soundQuality: string (nullable = true)
 |    |    |-- subCategory: string (nullable = true)
 |-- audio: struct (nullable = true)
 |    |-- bitRate: string (nullable = true)
 |    |-- duration: double (nullable = true)
 |    |-- fileFormat: string (nullable = true)
 |    |-- fileName: string (nullable = true)
 |    |-- fileSize: long (nullable = true)
 |    |-- recodingType: string (nullable = true)
 |    |-- sample

In [6]:

# annotations 배열을 개별 행으로 변환
df_flattened = df.withColumn("annotation", explode(col("annotations")))

# 구조체 내부의 필드를 개별 컬럼으로 변환
df_flattened = df_flattened.select(
    # annotation 내부 필드
    col("annotation.area.start").alias("area_start"),
    col("annotation.area.end").alias("area_end"),
    col("annotation.categories.category_01").alias("category_01"),
    col("annotation.categories.category_02").alias("category_02"),
    col("annotation.categories.category_03").alias("category_03"),
    col("annotation.decibel").alias("decibel"),
    col("annotation.labelName").alias("labelName"),
    col("annotation.soundQuality").alias("soundQuality"),
    col("annotation.subCategory").alias("subCategory"),

    # audio 내부 필드
    col("audio.bitRate").alias("bitRate"),
    col("audio.duration").alias("duration"),
    col("audio.fileFormat").alias("fileFormat"),
    col("audio.fileName").alias("fileName"),
    col("audio.fileSize").alias("fileSize"),
    col("audio.recodingType").alias("recodingType"),
    col("audio.sampleRate").alias("sampleRate"),

    # environment 내부 필드
    col("environment.acqDevice").alias("acqDevice"),
    col("environment.acqMethod").alias("acqMethod"),
    col("environment.acqType").alias("acqType"),
    col("environment.areaUse").alias("areaUse"),
    col("environment.dayNight").alias("dayNight"),
    col("environment.direction").alias("direction"),
    col("environment.distance").alias("distance"),
    col("environment.district").alias("district"),
    col("environment.gps.latitude").alias("latitude"),
    col("environment.gps.longitude").alias("longitude"),
    col("environment.micClass").alias("micClass"),
    col("environment.obstacle").alias("obstacle"),
    col("environment.place").alias("place"),
    col("environment.recordingTime").alias("recordingTime"),
    col("environment.urban").alias("urban"),
    col("environment.weather").alias("weather"))

# 결과 출력 (줄임 없이)
df_flattened.show(truncate=False)

25/02/27 13:25:31 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+--------+-----------+-----------+-----------+-------+--------------------+------------+-----------+--------+--------+----------+------------------+--------+------------+----------+--------------+---------+-------+------------+--------+---------+--------+--------+--------+---------+--------+--------+--------+-------------+----------+-------+
|area_start|area_end|category_01|category_02|category_03|decibel|labelName           |soundQuality|subCategory|bitRate |duration|fileFormat|fileName          |fileSize|recodingType|sampleRate|acqDevice     |acqMethod|acqType|areaUse     |dayNight|direction|distance|district|latitude|longitude|micClass|obstacle|place   |recordingTime|urban     |weather|
+----------+--------+-----------+-----------+-----------+-------+--------------------+------------+-----------+--------+--------+----------+------------------+--------+------------+----------+--------------+---------+-------+------------+--------+---------+--------+--------+--------+------

In [7]:
#SQL 쿼리로 데이터 추출
df_flattened.createOrReplaceTempView("json_table")

# explode를 통해 평탄화한 'annotations' 데이터 처리
df_flattened = spark.sql("""
    SELECT *
    FROM json_table
""")

In [8]:
# ✅ Config 파일에서 MySQL 연결 정보 로드
mysql_url = f"jdbc:mysql://{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?useUnicode=true&characterEncoding=UTF-8"
mysql_properties = {
    "user": DB_CONFIG["user"],
    "password": DB_CONFIG["password"],
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [9]:
# MySQL로 DataFrame 적재 (쿼리 결과가 None이 아닌 경우에만)
if df_flattened is not None:
    df_flattened.write.jdbc(url=mysql_url, table="car_siren_data", mode="overwrite", properties=mysql_properties)
    print("데이터가 MySQL로 성공적으로 적재되었습니다!")
else:
    print("쿼리 결과가 없습니다. 데이터 추출이 실패했습니다.")



데이터가 MySQL로 성공적으로 적재되었습니다!


                                                                                

In [5]:
spark.stop()