# WAV_car_MySQL_적재

## 0. Spark Session 생성

In [1]:
from pyspark.sql import SparkSession

# MySQL JDBC 드라이버 경로 (압축 푼 드라이버 JAR 파일 경로)
mysql_driver_path = "/home/ubuntu/mysql-connector-j-9.2.0/mysql-connector-j-9.2.0.jar"


# SparkSession 생성
spark = SparkSession.builder \
    .appName("WAV_test") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.jars", mysql_driver_path) \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .getOrCreate()

In [2]:
import sys
print("Python version:", sys.version)

Python version: 3.6.13 |Anaconda, Inc.| (default, Jun  4 2021, 14:25:59) 
[GCC 7.5.0]


## 1. wav_car_horn_data

### 1.1 WAV -> MFCC 변환 데이터프레임

In [3]:
import pyspark
from pyspark.sql import SparkSession
import io
from scipy.io import wavfile
import librosa
import numpy as np
import os
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType, StringType

# HDFS에서 모든 WAV 파일 읽기
hdfs_dir = "hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car"
binary_df = spark.read.format("binaryFile").load(hdfs_dir)

# 🔹 UDF (User Defined Function) 정의: WAV → MFCC 변환
def extract_mfcc(binary_data):
    try:
        audio_bytes = io.BytesIO(binary_data)  # 바이너리 데이터를 메모리 파일로 변환
        sr, audio = wavfile.read(audio_bytes)  # scipy로 샘플링 레이트 확인
        audio_librosa, sr_librosa = librosa.load(audio_bytes, sr=None)  # librosa로 리샘플링
        mfcc = librosa.feature.mfcc(y=audio_librosa, sr=sr_librosa, n_mfcc=13)  # MFCC 추출
        mfcc_mean = np.mean(mfcc, axis=1).astype(float)  # 평균 계산
        return mfcc_mean.tolist()  # 리스트로 반환
    except Exception as e:
        return None  # 에러 발생 시 None 반환

# UDF 등록
mfcc_udf = udf(extract_mfcc, ArrayType(FloatType()))

# 🔹 파일 이름 추출 UDF 정의
def extract_filename(path):
    return os.path.basename(path)

filename_udf = udf(extract_filename, StringType())

# 🔹 변환 적용
df_mfcc = binary_df \
    .withColumn("fileName", filename_udf(binary_df["path"])) \
    .withColumn("mfcc_features", mfcc_udf(binary_df["content"]))

# 🔹 배열 데이터를 개별 컬럼으로 변환
mfcc_columns = [f"mfcc_{i+1}" for i in range(13)]
for i in range(13):
    df_mfcc = df_mfcc.withColumn(mfcc_columns[i], df_mfcc["mfcc_features"][i])

# 🔹 불필요한 컬럼 정리
df_mfcc = df_mfcc.select(["fileName"] + mfcc_columns)

# 🔹 결과 저장 (HDFS)
# output_path = "hdfs://localhost:9000/shared_data/mfcc_features/"
# df_mfcc.write.csv(output_path, header=True, mode="overwrite")

# print(f"✅ MFCC 데이터가 HDFS에 저장됨: {output_path}")

In [4]:
df_mfcc.show()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 473, in main
    raise Exception(("Python in worker has different version %s than that in " +
Exception: Python in worker has different version 3.8 than that in driver 3.6, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


In [9]:
display(df_mfcc.toPandas())

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,file_name,mfcc_1,mfcc_2,mfcc_3,mfcc_4,mfcc_5,mfcc_6,mfcc_7,mfcc_8,mfcc_9,mfcc_10,mfcc_11,mfcc_12,mfcc_13
0,1.car_horn_87719_1.wav,-303.535339,24.502613,19.504515,36.805042,14.415648,24.194254,-15.078179,9.494457,3.428701,4.340698,-15.809431,4.194707,1.669172
1,1.car_horn_87688_1.wav,-275.247253,74.094482,-7.014953,35.502041,20.998775,17.419048,-4.690052,19.882788,7.529735,4.112433,5.293692,-0.301400,-13.501287
2,1.car_horn_87964_1.wav,-233.662842,87.917618,45.434906,19.435001,9.098841,32.400368,5.817845,9.204194,14.611247,20.294502,-11.113198,0.092820,13.061584
3,1.car_horn_88422_1.wav,-405.863525,138.927872,48.664036,-7.739197,-1.499425,22.140759,2.871114,3.680598,21.160671,5.716446,10.310057,0.368055,5.270670
4,1.car_horn_87987_1.wav,-318.161530,126.123177,43.729805,-11.429483,-5.223235,13.667998,0.934844,10.497743,2.401143,22.281258,-2.912201,14.498095,-1.191311
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3184,1.car_horn_11394_1.wav,-186.245071,206.334549,-90.508675,0.126264,-12.169629,-12.740691,0.159179,-0.684558,6.783070,-21.042934,3.623416,0.958702,-15.837363
3185,1.car_horn_11326_1.wav,-187.140152,211.360199,-117.426407,8.978872,-16.096888,-14.844014,-2.302679,-7.231889,12.456937,-20.212053,2.541519,-1.406763,-18.378464
3186,1.car_horn_11041_1.wav,-220.496429,186.976395,-89.486778,0.602775,-16.530031,-12.816244,0.596911,-9.399220,4.694217,-13.281899,-0.215779,-3.420600,-10.971604
3187,1.car_horn_11349_1.wav,-181.978867,185.205734,-103.468681,1.288591,-14.240864,-7.906539,-2.105582,-5.007140,8.129882,-19.788095,7.852820,-1.051348,-12.499610


### 1.2 wav_car_horn_data 데이터 MySQL에 적재

In [4]:
df_mfcc.createOrReplaceTempView("wav_car_horn")

In [5]:
# SQL 쿼리 실행하여 데이터 추출
df_mfcc = spark.sql("""
    SELECT *
    FROM wav_car_horn
""")

In [6]:
# MySQL연결
mysql_url = "jdbc:mysql://15.168.145.74:3306/my_db?useUnicode=true&characterEncoding=UTF-8"
mysql_properties = {
    "user": "root",
    "password": "root",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [7]:
# MySQL로 DataFrame 적재 (쿼리 결과가 None이 아닌 경우에만)
if df_mfcc is not None:
    df_mfcc.write.jdbc(url=mysql_url, table="wav_car_horn_data", mode="overwrite", properties=mysql_properties)
    print("데이터가 MySQL로 성공적으로 적재되었습니다!")
else:
    print("쿼리 결과가 없습니다. 데이터 추출이 실패했습니다.")


데이터가 MySQL로 성공적으로 적재되었습니다!


## 2. wav_car_siren_data

### 2.1 WAV -> MFCC 변환 데이터프레임

In [8]:
import pyspark
from pyspark.sql import SparkSession
import io
from scipy.io import wavfile
import librosa
import numpy as np
import os
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType, StringType

# HDFS에서 모든 WAV 파일 읽기
hdfs_dir = "hdfs://localhost:9000/shared_data/raw_data/1.Car/2.siren_of_car"
binary_df = spark.read.format("binaryFile").load(hdfs_dir)

# 🔹 UDF (User Defined Function) 정의: WAV → MFCC 변환
def extract_mfcc(binary_data):
    try:
        audio_bytes = io.BytesIO(binary_data)  # 바이너리 데이터를 메모리 파일로 변환
        sr, audio = wavfile.read(audio_bytes)  # scipy로 샘플링 레이트 확인
        audio_librosa, sr_librosa = librosa.load(audio_bytes, sr=None)  # librosa로 리샘플링
        mfcc = librosa.feature.mfcc(y=audio_librosa, sr=sr_librosa, n_mfcc=13)  # MFCC 추출
        mfcc_mean = np.mean(mfcc, axis=1).astype(float)  # 평균 계산
        return mfcc_mean.tolist()  # 리스트로 반환
    except Exception as e:
        return None  # 에러 발생 시 None 반환

# UDF 등록
mfcc_udf = udf(extract_mfcc, ArrayType(FloatType()))

# 🔹 파일 이름 추출 UDF 정의
def extract_filename(path):
    return os.path.basename(path)

filename_udf = udf(extract_filename, StringType())

# 🔹 변환 적용
df_mfcc_siren_car = binary_df \
    .withColumn("fileName", filename_udf(binary_df["path"])) \
    .withColumn("mfcc_features", mfcc_udf(binary_df["content"]))

# 🔹 배열 데이터를 개별 컬럼으로 변환
mfcc_columns = [f"mfcc_{i+1}" for i in range(13)]
for i in range(13):
    df_mfcc_siren_car = df_mfcc_siren_car.withColumn(mfcc_columns[i], df_mfcc_siren_car["mfcc_features"][i])

# 🔹 불필요한 컬럼 정리
df_mfcc_siren_car = df_mfcc_siren_car.select(["fileName"] + mfcc_columns)

# 🔹 결과 저장 (HDFS)
# output_path = "hdfs://localhost:9000/shared_data/mfcc_features/"
# df_mfcc.write.csv(output_path, header=True, mode="overwrite")

# print(f"✅ MFCC 데이터가 HDFS에 저장됨: {output_path}")

In [7]:
display(df_mfcc_siren_car.toPandas().head(5))

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,file_name,mfcc_1,mfcc_2,mfcc_3,mfcc_4,mfcc_5,mfcc_6,mfcc_7,mfcc_8,mfcc_9,mfcc_10,mfcc_11,mfcc_12,mfcc_13
0,1.car_siren_303_1.wav,-331.622131,194.139023,-19.950441,23.301525,-9.281691,4.838598,8.544705,-0.232366,18.394283,13.557921,13.304634,7.358499,4.700673
1,1.car_siren_493_1.wav,-287.294861,202.257141,-7.542941,14.949334,0.315071,13.062037,5.098646,3.266062,16.367357,14.25031,7.647503,3.553201,6.276691
2,1.car_siren_288_1.wav,-239.801895,153.763153,-23.571865,27.403536,-8.118983,12.162045,-7.386656,-1.804924,10.035617,18.910063,17.573338,10.941156,8.292388
3,1.car_siren_499_1.wav,-264.13681,225.186554,-12.084494,-22.841236,8.205717,-0.783459,4.596014,1.768313,0.112573,11.339332,11.931373,5.466546,-0.937017
4,1.car_siren_409_1.wav,-280.859741,202.961441,-18.432289,15.585643,4.13628,12.429647,5.971637,-0.23055,6.385309,5.937667,5.668032,1.751872,2.468761


In [9]:
df_mfcc_siren_car.show()

+--------------------+----------+---------+----------+----------+-----------+-----------+----------+-----------+----------+---------+---------+----------+-----------+
|            fileName|    mfcc_1|   mfcc_2|    mfcc_3|    mfcc_4|     mfcc_5|     mfcc_6|    mfcc_7|     mfcc_8|    mfcc_9|  mfcc_10|  mfcc_11|   mfcc_12|    mfcc_13|
+--------------------+----------+---------+----------+----------+-----------+-----------+----------+-----------+----------+---------+---------+----------+-----------+
|1.car_siren_303_1...|-331.62213|194.13902|-19.950441| 23.301525|  -9.281691|  4.8385983|  8.544705|  -0.232366| 18.394283|13.557921|13.304634| 7.3584986|   4.700673|
|1.car_siren_493_1...|-287.29486|202.25714| -7.542941| 14.949334|  0.3150707|  13.062037| 5.0986457|  3.2660618| 16.367357| 14.25031| 7.647503|  3.553201|  6.2766914|
|1.car_siren_288_1...| -239.8019|153.76315|-23.571865| 27.403536|  -8.118983|  12.162045| -7.386656| -1.8049235| 10.035617|18.910063|17.573338| 10.941156|   8.292388

### 2.2 wav_car_siren_data 데이터 MySQL에 적재

In [10]:
df_mfcc_siren_car.createOrReplaceTempView("wav_car_siren")

# SQL 쿼리 실행하여 데이터 추출
df_mfcc_siren_car = spark.sql("""
    SELECT *
    FROM wav_car_siren
""")

mysql_url = "jdbc:mysql://15.168.145.74:3306/my_db?useUnicode=true&characterEncoding=UTF-8"
mysql_properties = {
    "user": "root",
    "password": "root",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# MySQL로 DataFrame 적재 (쿼리 결과가 None이 아닌 경우에만)
if df_mfcc_siren_car is not None:
    df_mfcc_siren_car.write.jdbc(url=mysql_url, table="wav_car_siren_data", mode="overwrite", properties=mysql_properties)
    print("데이터가 MySQL로 성공적으로 적재되었습니다!")
else:
    print("쿼리 결과가 없습니다. 데이터 추출이 실패했습니다.")


데이터가 MySQL로 성공적으로 적재되었습니다!


In [None]:
spark.stop()