In [2]:
import pandas as pd
from pyspark.sql import SparkSession, DataFrame

# Extract

In [70]:
def create_session(appName:str):
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()


# mysql 설정 변수
db_domain = "***"
db_port = "***"
database = "***"
table = "***"
sql = f"""
    SELECT DISTINCT mem_email, mem_age, mem_gen, fin_prtn_time, cat_nm
    FROM mem_detail
        JOIN prtn_setting ON mem_detail.mem_email = prtn_setting.prtn_mem
        RIGHT JOIN prtn_fin ON prtn_setting.prtn_id = prtn_fin.prtn_id
        LEFT JOIN pill_prod ON prtn_setting.prtn_nm = pill_prod.pill_cd
        JOIN pill_cmb ON pill_cmb.cmb_pill = pill_prod.pill_cd
        JOIN pill_cat ON pill_cmb.cmb_cat = pill_cat.cat_cd
"""
user = "***"
password = "***"
write_mode = "append"

# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-------------------+-------+-------+----------------+------------------+
|          mem_email|mem_age|mem_gen|   fin_prtn_time|            cat_nm|
+-------------------+-------+-------+----------------+------------------+
|       aaa@xxxx.com|  30~39|   male|2023-10-30 07:00|아미노산 및 단백질|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 07:00|            지방산|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 07:00|    신규기능성식품|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 07:00|            페놀류|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 07:00|  비타민 및 무기질|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 08:00|아미노산 및 단백질|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 08:00|            지방산|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 08:00|    신규기능성식품|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 08:00|            페놀류|
|       aaa@xxxx.com|  30~39|   male|2023-10-30 08:00|  비타민 및 무기질|
|qwert0175@naver.com|  20~29|   male|2023-11-06 16:04|아미노산 및 단백질|
|qwert0175@naver.com|  20~29

# LOAD

In [7]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    

# 판다스로 데이터 조회
pandas_df = pd.read_excel('./rule.xlsx')
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "***"
db_port = "***"
database = "***"
table = "rule_data"
sql = f"""
    SELECT * 
    FROM rule_data
"""
user = "***"
password = "***"
write_mode = "append"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+----------+-----+------+------------------------------+
|    upload|  age|   gen|                          rule|
+----------+-----+------+------------------------------+
|2023-11-06|10~19|  male| ['비타민 및 무기질', '발효...|
|2023-11-06|20~29|female|['발효미생물류', '비타민 및...|
|2023-11-06|40~49|female| ['비타민 및 무기질', '발효...|
|2023-11-06|60~69|  male| ['비타민 및 무기질', '인삼...|
|2023-11-06|30~39|  male| ['비타민 및 무기질', '발효...|
|2023-11-06|50~59|female| ['비타민 및 무기질', '발효...|
|2023-11-06|50~59|  male| ['비타민 및 무기질', '인삼...|
|2023-11-06|20~29|  male| ['비타민 및 무기질', '발효...|
|2023-11-06|10~19|female| ['비타민 및 무기질', '발효...|
|2023-11-06|30~39|female| ['비타민 및 무기질', '발효...|
|2023-11-06|70~79|  male| ['인삼류', '비타민 및 무기...|
|2023-11-06|70~79|female| ['비타민 및 무기질', '인삼...|
|2023-11-06|40~49|  male| ['비타민 및 무기질', '발효...|
|2023-11-06|60~69|female| ['비타민 및 무기질', '지방...|
+----------+-----+------+------------------------------+

