In [1]:
# !pip install pyspark
# !pip install findspark

Collecting pyspark
  Using cached pyspark-3.5.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.0
Collecting findspark
  Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [35]:
import findspark
import pandas as pd
from pyspark.sql import SparkSession, DataFrame

### pill_prod 데이터 입력

In [31]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv('./data/pill_prod.csv', dtype=str)
pandas_df["pill_rv"] = pandas_df["pill_rv"].astype(float)
pandas_df["pill_rvnum"] = pandas_df["pill_rvnum"].astype(int)
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "pill_prod"
sql = f"""
    SELECT * 
    FROM pill_prod
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-------+---------------------------+------------------+-------+----------+----------------------------+
|pill_cd|                    pill_nm|          pill_mnf|pill_rv|pill_rvnum|                   pill_info|
+-------+---------------------------+------------------+-------+----------+----------------------------+
|  05121|    엔트리 에스론 우먼 골드|          하이리빙|    0.0|         0|*아침/저녁*   하루에 2회,...|
|  05122|         세노메가 키즈 구미|          대웅제약|    0.0|         0|*아침/저녁*   하루에 2회,...|
|  05123|       헬리오케어 오랄 캡슐|    칸타브리아랩스|    0.0|         0|                         NaN|
|  05124|                     쿨캡슐|          스마티민|    0.0|         0| *언제든*   하루에 1회, 1...|
|  05125|                 더헤모리버|           THE하다|    0.0|         0|*언제든, 식전*   하루에 1...|
|  05126|            블루그램 비오틴|      제이미파커스|    0.0|         0| *언제든*   하루에 1회, 1...|
|  05127|               라파테인 105|더블유에이치글로벌|    0.0|         0|*오전중, 식후*   하루에 1...|
|  05128|          눈케어 베타카로틴|      내추럴플러스|    0.0|         0| *언제든*   하루에 1회, 

### pill_nutr 데이터 입력

In [34]:
pandas_df = pd.read_csv("./data/pill_nutr.csv", encoding='euc-kr')
pandas_df.drop("cat_nm", axis=1,inplace=True)
pandas_df

Unnamed: 0,nutr_cd,nutr_nm
0,aa,비타민C
1,ab,비타민B1
2,ac,비타민A
3,ad,비타민D
4,ae,비타민B6
...,...,...
99,dv,헛개나무열매
100,dw,헤모힘
101,dx,프리바이오틱스
102,dy,세라마이드


In [36]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv("./data/pill_nutr.csv", encoding='euc-kr')
pandas_df.drop("cat_nm", axis=1,inplace=True)
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "pill_nutr"
sql = f"""
    SELECT * 
    FROM pill_nutr
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-------+------------------+
|nutr_cd|           nutr_nm|
+-------+------------------+
|     ce|커큐민(강황추출물)|
|     cm|        그린커피빈|
|     aa|           비타민C|
|     cu|          아르기닌|
|     bg|        글루코사민|
|     ai|         비타민B12|
|     bw|              감태|
|     ay|              구리|
|     aq|              크롬|
|     dc|회화나무열매추출물|
|     cv|              UDCA|
|     az|           오메가3|
|     cf|          시네트롤|
|     ar|              아연|
|     ab|          비타민B1|
|     bx|        아스타잔틴|
|     bh|        옥타코사놀|
|     cn|            키토산|
|     dd|          매스틱검|
|     aj|        베타카로틴|
+-------+------------------+
only showing top 20 rows



### pill_prod 데이터 입력

In [37]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv('./data/pill_func.csv', dtype=str)
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "pill_func"
sql = f"""
    SELECT * 
    FROM pill_func
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-------+------------------+----------+
|func_cd|           func_nm|func_emoji|
+-------+------------------+----------+
|     01|            피로감|        🔋|
|     05|   혈관 & 혈액순환|        🩸|
|     11|           뼈 건강|        🦴|
|     09|          면역기능|        ⚔️|
|     23|  탈모 & 손톱 건강|        💅|
|     15|         남성 건강|        🚹|
|     03|         피부 건강|        🧼|
|     17|운동 능력 & 근육량|        💪|
|     02|           눈 건강|        👀|
|     12|     노화 & 항산화|        👼|
|     16|              혈압|        💓|
|     04|            체지방|        💣|
|     10|   혈중 콜레스테롤|        🩸|
|     13|         여성 건강|        🚺|
|     14|소화 & 위식도 건강|        💨|
|     18|          두뇌활동|        🧠|
|     21|       치아 & 잇몸|        🦷|
|     22|임산부 & 태아 건강|        🍼|
|     25|       여성 갱년기|     ❤️‍🔥|
|     26|       호흡기 건강|        🫁|
+-------+------------------+----------+
only showing top 20 rows



### pill_cmb 데이터 입력

In [38]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv("./data/pill_cmb.csv", dtype=str)
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "pill_cmb"
sql = f"""
    SELECT * 
    FROM pill_cmb
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-------+--------+--------+--------+
|cmb_cat|cmb_nutr|cmb_func|cmb_pill|
+-------+--------+--------+--------+
|     nf|      bq|      24|   01579|
|     nf|      ch|      24|   01579|
|     nf|      aa|      24|   01579|
|     nf|      at|      24|   01579|
|     vm|      bg|      24|   01579|
|     vm|      an|      24|   01579|
|     vm|      aw|      24|   01579|
|     vm|      dc|      25|   10614|
|     vm|      dc|      10|   10614|
|     vm|      da|      24|   01579|
|     vm|      ah|      02|   10615|
|     vm|      bq|      24|   01579|
|     vm|      ah|      24|   10616|
|     vm|      ch|      24|   01579|
|     vm|      aa|      24|   01579|
|     nf|      bx|      12|   02875|
|     vm|      ah|      12|   10617|
|     vm|      at|      24|   01579|
|     nf|      bx|      05|   02875|
|     vm|      ah|      12|   10618|
+-------+--------+--------+--------+
only showing top 20 rows



### pill_sideeff 데이터 입력

In [39]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv('./data/pill_sideeff.csv')
pandas_df.drop(["sideeff_nm1","sideeff_nm2"], axis=1,inplace=True)
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "pill_sideeff"
sql = f"""
    SELECT * 
    FROM pill_sideeff
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-----------+-----------+----------------------------------+----------------------------------+
|sideeff_cd1|sideeff_cd2|                       sideeff_txt|                   sideeff_caution|
+-----------+-----------+----------------------------------+----------------------------------+
|         be|         ac|  루테인은 비타민A의 일종으로, ...|  고함량 루테인 섭취 시, 비타민...|
|         ar|         ay|                    체내 흡수 방해|     둘 다 섭취 시, 따로 복용 권장|
|         az|         br|감마리놀렌산에 오메가6 지방산이...|  오메가3 지방산과 오메가6 지방...|
|         ao|         au|                    체내 흡수 방해|  철분을 식전 복용, 칼슘제 식사...|
|         ad|         ao| 칼슘 과다 흡수로 고칼슘혈증 유...|동맥경화나 이상지질혈증이 있는 ...|
|         ah|         ag|    비타민E 과잉 섭취 시, 혈액 ...|    비타민 E 중복 섭취 시, 하루...|
|         br|         az|감마리놀렌산에 오메가6 지방산이...|  오메가3 지방산과 오메가6 지방...|
|         au|         ar|                    체내 흡수 방해|     둘 다 섭취 시, 따로 복용 권장|
|         ar|         ao|    고용량 칼슘은 아연의 흡수 방해|  장기간 칼슘제 섭취 시 아연을 ...|
|         ac|         aj|베타카로틴은 비타민A의 일종으로...|  

### pill_cat 데이터 입력

In [31]:
pd.read_csv('./data/pill_cat.csv')

Unnamed: 0,cat_cd,cat_nm,cat_emoji
0,vm,비타민 및 무기질,🔋
1,fs,지방산,🐟
2,fm,발효미생물류,🚽
3,nf,신규기능성식품,🌟
4,df,식이섬유,🌾
5,ph,페놀류,🍇
6,gr,인삼류,🌱
7,ch,엽록소,🍃
8,ap,아미노산 및 단백질,💪🏻
9,cs,당 및 탄수화물,🍚


In [40]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv('./data/pill_cat.csv')
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "pill_cat"
sql = f"""
    SELECT * 
    FROM pill_cat
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+------+------------------+---------+
|cat_cd|            cat_nm|cat_emoji|
+------+------------------+---------+
|    ph|            페놀류|       🍇|
|    ap|아미노산 및 단백질|     💪🏻|
|    nf|    신규기능성식품|       🌟|
|    fm|      발효미생물류|       🚽|
|    vm|  비타민 및 무기질|       🔋|
|    ch|            엽록소|       🍃|
|    gr|            인삼류|       🌱|
|    fs|            지방산|       🐟|
|    df|          식이섬유|       🌾|
|    cs|    당 및 탄수화물|       🍚|
+------+------------------+---------+



### health 데이터 입력

In [20]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_excel('./data/health.xlsx')
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "health"
sql = f"""
    SELECT * 
    FROM health
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-----------------------+----------+------------+
|              health_nm|health_tag|health_emoji|
+-----------------------+----------+------------+
|          아크 트레이너|      하체|          🦵|
|              힙 어덕션|      하체|          🦵|
|               업도미널|      코어|          🏋|
|  딥 & 레그 레이즈 체어|      코어|          🏋|
|        크로스 트레이너|    유산소|          🏃|
|            시티드 로우|      상체|          💪|
|          레그 익스텐션|      하체|          🦵|
| 인클라인 체스트 프레스|      상체|          💪|
|            벤치 프레스|      상체|          💪|
|       친 & 딥 어시스트|      상체|          💪|
|          레터럴 레이즈|      상체|          💪|
|          아크 트레이너|    유산소|          🏃|
|       힙 트러스트 벤치|      하체|          🦵|
|각도 조절 벤치(INCILNE)|      기타|           ❓|
|            드롭 스쿼트|      하체|          🦵|
|                  암 컬|      상체|          💪|
|    디클라인 벤치프레스|      상체|          💪|
|            스미스 머신|      상체|          💪|
|벤트 오버 레터럴 레이즈|      상체|          💪|
|   인클라인 벤치 프레스|      상체|          💪|
+-----------------------+----------+-

In [None]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv('./data/pill_cmb.csv', dtype=str)
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "dw"
table = "pill_cmb"
sql = f"""
    SELECT * 
    FROM pill_cmb
"""
user = "batch"
password = "Data1q2w3e4r!!"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

In [11]:
def create_session(appName:str):
    '''
    # Spark 세션 만드는 함수
    # appName : 스파크 세션의 이름 지정 변수
    # return : pyspark.sql.SparkSession
    '''
    return SparkSession \
            .builder \
            .appName(appName) \
            .getOrCreate()

def mysql_select(spark:SparkSession, domain:str, port:str, schema:str, query:str, user:str, password:str):
    '''
    # mysql 조회 함수
    # spark : SparkSession
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # query : 조회 SQL
    # user : DB 유저
    # password : DB 비밀번호
    # return : pyspark.sql.DataFrame
    '''
    return spark \
        .read \
        .format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
def mysql_write(df:DataFrame, mode:str, domain:str, port:str, schema:str, table:str, user:str, password:str):
    '''
    # mysql 데이터 저장 함수
    # df : pyspark.sql.DataFrame
    # mode : 데이터 저장 방식 지정 'overwrite', 'append', 'ignore'
    # domain : DB 서버 IP
    # port : DB 서버 포트
    # schema : MySQL Database 명
    # table : 테이블 명
    # user : DB 유저
    # password : DB 비밀번호
    '''
    df.write \
        .format("jdbc") \
        .mode(mode) \
        .option("encoding", "UTF-8") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", f"jdbc:mysql://{domain}:{port}/{schema}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8") \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .save()
    
from pyspark.sql.functions import col, cast
from pyspark.sql.types import FloatType

    
# spark 경로 설정
findspark.init()
# 판다스로 데이터 조회
pandas_df = pd.read_csv('./pill_prod.csv', dtype=str)
# 스파크 세션 생성
spark = create_session("spark_test")
# 판다스 데이터 Spark 데이터프레임으로 변경
df = spark.createDataFrame(pandas_df)
# mysql 설정 변수
db_domain = "54.180.91.68"
db_port = "3306"
database = "km"
table = "pill_prod"
sql = f"""
    SELECT * 
    FROM pill_prod
"""
user = "lkm"
password = "dbdb"
write_mode = "overwrite"

# mysql 데이터 저장
mysql_write(df, write_mode, db_domain, db_port, database, table, user, password)
# mysql 데이터 조회
mysql_df = mysql_select(spark, db_domain, db_port, database, sql, user, password)
mysql_df.show()

+-------+--------------------------------+----------------------+-------+---------+-----------------------------+
|pill_cd|                         pill_nm|              pill_mnf|pill_rv|pill_rvnm|                    pill_info|
+-------+--------------------------------+----------------------+-------+---------+-----------------------------+
|ak60417|                 프리미엄 화려화|              헬스피플|    NaN|        0|  *언제든*   하루에 1회, 1...|
|ak60418|                        리즈슬림|            셀피케이션|    NaN|        0|  *언제든*   하루에 1회, 1...|
|ak60419|          아드레날 코티솔 서포트|프로토콜포라이프발란스|    NaN|        0|*아침/점심/저녁*   하루에 ...|
|ak60420|                        리버코민|            이너네이쳐|    NaN|        0|  *오전중*   하루에 1회, 1...|
|ak60421|                      맨타우르스|            바이탈리쉬|    NaN|        0| *언제든, 식후*   하루에 1...|
|ak60422|        바디코드 모로실에 빠지다|            메디포스트|    NaN|        0|  *오전중*   하루에 1회, 1...|
|ak60423|      뉴트리스토리 비오틴 7000 +|                어댑트|    NaN|        0|  *언제든*   하루에 1회, 1...|
