In [14]:
import pandas as pd
import re
import numpy as np
import sqlite3
import os


## 10 스파크를 사용해서 데이터베이스 처리해보기

### 파이스파크 설치
pip install pyspark

### 10-1 데이터베이스 접근을 위한 스파크 세션 처리

In [3]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
           .config('spark.jars.packages', 'org.xerial:sqlite-jdbc:3.34.0')
           .config("spark.driver.host","127.0.0.1") 
           .config("spark.driver.bindAddress","127.0.0.1")
           .master("local")
           .appName("PySpark_test")
           .getOrCreate())

:: loading settings :: url = jar:file:/Users/renee/anaconda3/envs/pycaret_env/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/renee/.ivy2/cache
The jars for the packages stored in: /Users/renee/.ivy2/jars
org.xerial#sqlite-jdbc added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10e56845-f896-45dd-8415-855b82f6c5e0;1.0
	confs: [default]
	found org.xerial#sqlite-jdbc;3.34.0 in central
downloading https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar ...
	[SUCCESSFUL ] org.xerial#sqlite-jdbc;3.34.0!sqlite-jdbc.jar (1619ms)
:: resolution report :: resolve 3089ms :: artifacts dl 1623ms
	:: modules in use:
	org.xerial#sqlite-jdbc;3.34.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   1   |   0   ||   1   |   1   |
	-----

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

### 10-2 데이터베이스 처리

In [None]:
### 데이터베이스 접근

In [7]:
path_db1 = "./lending_club_data/sql_db/lending_club_2012_hw2.db"

In [8]:
### 특정 테이블을 직접 읽기

In [9]:
df = (spark.read.format('jdbc') 
        .options(driver='org.sqlite.JDBC', dbtable='borrower',
                 url='jdbc:sqlite:'+path_db1)
        .load())

In [10]:
df.count()

53367

In [11]:
df.columns

['id', '직업명', '근무연차 ', '주택소유', '우편번호', '거주국가', '회원번호']

In [12]:
### 스파크 세션을 다시 만들기

In [15]:
spark1 = (SparkSession.builder
    .master("local")
    .appName("SQLite JDBC")
    .config("spark.sql.encoding", "UTF-8") 
    .config(
        "spark.jars",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
    .config(
        "spark.driver.extraClassPath",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
    .getOrCreate())

24/09/05 15:51:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [17]:
spark1

In [18]:
driver='org.sqlite.JDBC'
url='jdbc:sqlite:'+path_db1

In [19]:
### 10-3 데이터베이스 내의 테이블 확인하기

In [22]:
sub_query = """SELECT name 
           FROM sqlite_master 
           WHERE type IN ('table', 'view') AND name NOT LIKE 'sqlite_%' """

In [23]:
df2 = spark1 \
    .read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("query", sub_query) \
    .load()

In [24]:
df2.show()

+-------------------+
|               name|
+-------------------+
|           borrower|
|             credit|
|      credit_rating|
|           hardship|
|             income|
|            inquiry|
|        installment|
|       loan_account|
|            payment|
|          revolving|
|secondary_applicant|
|         settlement|
|              trade|
+-------------------+



In [25]:
### 10-4 테이블 내의 데이터 확인하기

In [26]:
query1 = "select * from borrower limit 100"

In [27]:
df1 = spark1 \
    .read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("query", query1) \
    .load()

In [28]:
df1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- 직업명: string (nullable = true)
 |-- 근무연차 : string (nullable = true)
 |-- 주택소유: string (nullable = true)
 |-- 우편번호: string (nullable = true)
 |-- 거주국가: string (nullable = true)
 |-- 회원번호: double (nullable = true)



In [29]:
df1.count()

100

In [30]:
query2 = "select * from credit limit 100"

In [31]:
df2 = spark1 \
    .read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("query", query2) \
    .load()

In [32]:
df2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- 세금체납금액: double (nullable = true)
 |-- 주택담도대출계좌수: double (nullable = true)
 |-- 연체계좌수: double (nullable = true)
 |-- 은행카드활동계좌개수: double (nullable = true)
 |-- 은행카드정상계좌개수: double (nullable = true)
 |-- 은행카드개수: double (nullable = true)
 |-- 은행카드정상개수: double (nullable = true)
 |-- 중기연체계좌개수: double (nullable = true)
 |-- 단기연체계좌개수: double (nullable = true)
 |-- 장기연체계좌개수: double (nullable = true)
 |-- 최근개설계좌개수: double (nullable = true)
 |-- 특정한도이상소진계좌비율: double (nullable = true)
 |-- 연체발생회수 : double (nullable = true)
 |-- 신용한도시작일자: string (nullable = true)
 |-- 신용한도개설기록횟수: double (nullable = true)
 |-- 신용이상기록횟수: double (nullable = true)
 |-- 리볼링잔액: double (nullable = true)
 |-- 리볼링사용비율: double (nullable = true)
 |-- 신용한도: double (nullable = true)
 |-- 신용한도총기록회수: string (nullable = true)
 |-- 신용한도잔액: double (nullable = true)
 |-- 총신용한도금액: double (nullable = true)
 |-- 신용한도금액: double (nullable = true)
 |-- 은행카드신요한도: double (nullable = true)
 |-- 추심횟수

In [33]:
### 10-5 스파크 코딩으로 조인처리 하기

In [None]:
### 각 테이블 읽어오기

In [34]:
df_bw = (spark.read.format('jdbc') 
        .options(driver='org.sqlite.JDBC', dbtable='borrower',
                 url='jdbc:sqlite:'+path_db1)
        .load())

In [35]:
df_ct = (spark.read.format('jdbc') 
        .options(driver='org.sqlite.JDBC', dbtable='credit',
                 url='jdbc:sqlite:'+path_db1)
        .load())

In [36]:
# DataFrame을 사용하여 조인 수행
joined_df = df_bw.join(df_ct, df_bw.id == df_ct.id, "inner")

In [37]:
# 필요한 컬럼만 선택
selected_df = joined_df.select(df_bw.id, df_bw['근무연차 '], df_ct.세금체납금액)

In [38]:
selected_df.show()

+-------+---------+------------+
|     id|근무연차 |세금체납금액|
+-------+---------+------------+
|1422503|  2 years|         0.0|
|2054650|  3 years|         0.0|
|1283607|  4 years|         0.0|
|1441362|10+ years|         0.0|
|1465450|  5 years|         0.0|
|2298372|  3 years|         0.0|
|1207506|  3 years|         0.0|
|1281854|10+ years|         0.0|
|1435514| < 1 year|         0.0|
|1075932|  2 years|         0.0|
|1303972|  6 years|         0.0|
|1677468|10+ years|         0.0|
|2375142|10+ years|         0.0|
|1565433|     NULL|         0.0|
|2366398|     NULL|         0.0|
|1549604|     NULL|         0.0|
|1107801|  6 years|         0.0|
|1955212|10+ years|         0.0|
|1544732|  9 years|         0.0|
|1076509|  6 years|         0.0|
+-------+---------+------------+
only showing top 20 rows



In [39]:
### 10-6 내부 뷰테이블을 만들고 조인하기

In [None]:
### 스파크 내부 뷰 만들기

In [41]:
# DataFrame을 임시 테이블로 등록
df_bw.createOrReplaceTempView("borrower")
df_ct.createOrReplaceTempView("credit")

In [42]:
### 스파크 세션에서 바로 쿼리로 처리하기

- 한글 칼럼을 처리할 때는 backticks(`) 감싸서 처리하기

In [43]:
# SQL 쿼리를 사용하여 조인
result_df = spark.sql("""
    SELECT b.id, b.`근무연차 `, c.`세금체납금액`
    FROM borrower b
    JOIN credit c ON b.id = c.id
""")

# 결과 출력
result_df.show()

+-------+---------+------------+
|     id|근무연차 |세금체납금액|
+-------+---------+------------+
|1422503|  2 years|         0.0|
|2054650|  3 years|         0.0|
|1283607|  4 years|         0.0|
|1441362|10+ years|         0.0|
|1465450|  5 years|         0.0|
|2298372|  3 years|         0.0|
|1207506|  3 years|         0.0|
|1281854|10+ years|         0.0|
|1435514| < 1 year|         0.0|
|1075932|  2 years|         0.0|
|1303972|  6 years|         0.0|
|1677468|10+ years|         0.0|
|2375142|10+ years|         0.0|
|1565433|     NULL|         0.0|
|2366398|     NULL|         0.0|
|1549604|     NULL|         0.0|
|1107801|  6 years|         0.0|
|1955212|10+ years|         0.0|
|1544732|  9 years|         0.0|
|1076509|  6 years|         0.0|
+-------+---------+------------+
only showing top 20 rows

