In [2]:
import snowflake.connector
from dotenv import load_dotenv
import pandas as pd
import os 


In [3]:
load_dotenv()

user_id = os.getenv("SNOWFLAKE_USERNAME")
password = os.getenv("SNOWFLAKE_PASSWORD")
account = os.getenv("SNOWFLAKE_ACCOUNT_ID")

def snowflake_connection():
    conn = snowflake.connector.connect(
        user=user_id,
        password=password,
        account=account,
        warehouse='compute_wh', 
        database='dev' 
    )

    return conn.cursor()

# print(f"✅ USER: [{repr(user_id)}]")
# print(f"✅ PASSWORD length: {len(password) if password else '❌'}")
# print(f"✅ ACCOUNT: [{account}]")


In [4]:
cursor = snowflake_connection()
cursor.execute("SELECT * FROM dev.raw_data.nps LIMIT 10")
columns = [col[0] for col in cursor.description]


print(columns)
rows = cursor.fetchall()

target_df = pd.DataFrame(rows, columns=columns)
target_df

['ID', 'CREATED', 'SCORE']


Unnamed: 0,ID,CREATED,SCORE
0,1,2019-01-01 22:04:21,1
1,2,2019-01-02 03:47:26,9
2,3,2019-01-03 19:31:41,10
3,4,2019-01-04 05:02:11,10
4,5,2019-01-04 18:02:53,1
5,6,2019-01-04 23:06:18,2
6,7,2019-01-05 13:45:47,0
7,8,2019-01-05 09:26:13,2
8,9,2019-01-05 22:36:38,6
9,10,2019-01-06 13:22:47,10


In [5]:
import requests 

def extract(url):
    f = requests.get(url) 
    return (f.text) 

def transform(text):
    lines = text.strip().split("\n") 
    records = [] 
    for l in lines:
        (country, capital) = l.split(",")
        records.append([country, capital])
    return records 

def load(cur, records):
    target_table = 'dev.raw_data.country_capital'
    cur.execute(f"""
    CREATE TABLE IF NOT EXISTS {target_table} (
        country varchar primary key,
        capital varchar
    )""")

    for r in records:
        country = r[0].replace("'", "''")
        capital = r[1].replace("'", "''")
        sql = f"INSERT INTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
        print(country, "-", capital, "->", sql)
        cur.execute(sql)


In [6]:
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"
data = extract(link)
data

"country,capital\nAbkhazia,Sukhumi\nAfghanistan,Kabul\nAkrotiri and Dhekelia,Episkopi Cantonment\nAlbania,Tirana\nAlgeria,Algiers\nAmerican Samoa,Pago Pago\nAndorra,Andorra la Vella\nAngola,Luanda\nAnguilla,The Valley\nAntigua and Barbuda,St. John's\nArgentina,Buenos Aires\nArmenia,Yerevan\nAruba,Oranjestad\nAscension Island,Georgetown\nAustralia,Canberra\nAustria,Vienna\nAzerbaijan,Baku\nBahamas,Nassau\nBahrain,Manama\nBangladesh,Dhaka\nBarbados,Bridgetown\nBelarus,Minsk\nBelgium,Brussels\nBelize,Belmopan\nBenin,Porto-Novo\nBermuda,Hamilton\nBhutan,Thimphu\nBolivia,La Paz\nBosnia and Herzegovina,Sarajevo\nBotswana,Gaborone\nBrazil,Brasâ\x88\x9aâ\x89\xa0lia\nBritish Virgin Islands,Road Town\nBrunei,Bandar Seri Begawan\nBulgaria,Sofia\nBurkina Faso,Ouagadougou\nBurundi,Bujumbura\nCambodia,Phnom Penh\nCameroon,Yaoundâ\x88\x9aÂ©\nCanada,Ottawa\nCape Verde,Praia\nCayman Islands,George Town\nCentral African Republic,Bangui\nChad,N'Djamena\nChile,Santiago\nChina,Beijing\nChristmas Island,Fly

In [7]:
len(data)

5069

In [8]:
lines = transform(data)
lines

[['country', 'capital'],
 ['Abkhazia', 'Sukhumi'],
 ['Afghanistan', 'Kabul'],
 ['Akrotiri and Dhekelia', 'Episkopi Cantonment'],
 ['Albania', 'Tirana'],
 ['Algeria', 'Algiers'],
 ['American Samoa', 'Pago Pago'],
 ['Andorra', 'Andorra la Vella'],
 ['Angola', 'Luanda'],
 ['Anguilla', 'The Valley'],
 ['Antigua and Barbuda', "St. John's"],
 ['Argentina', 'Buenos Aires'],
 ['Armenia', 'Yerevan'],
 ['Aruba', 'Oranjestad'],
 ['Ascension Island', 'Georgetown'],
 ['Australia', 'Canberra'],
 ['Austria', 'Vienna'],
 ['Azerbaijan', 'Baku'],
 ['Bahamas', 'Nassau'],
 ['Bahrain', 'Manama'],
 ['Bangladesh', 'Dhaka'],
 ['Barbados', 'Bridgetown'],
 ['Belarus', 'Minsk'],
 ['Belgium', 'Brussels'],
 ['Belize', 'Belmopan'],
 ['Benin', 'Porto-Novo'],
 ['Bermuda', 'Hamilton'],
 ['Bhutan', 'Thimphu'],
 ['Bolivia', 'La Paz'],
 ['Bosnia and Herzegovina', 'Sarajevo'],
 ['Botswana', 'Gaborone'],
 ['Brazil', 'Brasâ\x88\x9aâ\x89\xa0lia'],
 ['British Virgin Islands', 'Road Town'],
 ['Brunei', 'Bandar Seri Begawan'],


In [9]:
cur = snowflake_connection() 
# load(cur, lines)

In [10]:
# table를 읽어서 판다스 데이터프레임으로 변환하고 처음 5개의 레코드와 레코드 수를 표시
def check_table_stats(cur, table, key):
    result = cur.execute(f"SELECT * FROM {table} ORDER BY {key}")
    df = cur.fetch_pandas_all()
    print(len(df))
    print(df.head())
    return df

In [11]:
check_table_stats(cur, "dev.raw_data.country_capital", "country")

248
                 COUNTRY              CAPITAL
0               Abkhazia              Sukhumi
1            Afghanistan                Kabul
2  Akrotiri and Dhekelia  Episkopi Cantonment
3                Albania               Tirana
4                Algeria              Algiers


Unnamed: 0,COUNTRY,CAPITAL
0,Abkhazia,Sukhumi
1,Afghanistan,Kabul
2,Akrotiri and Dhekelia,Episkopi Cantonment
3,Albania,Tirana
4,Algeria,Algiers
...,...,...
243,Western Sahara,El Aaiââ«n
244,Yemen,SanaâÂ°
245,Zambia,Lusaka
246,Zimbabwe,Harare


## 위 ETL 코드를 다시 실행하고 멱등성(idempotency)이 지켜지는지 확인

In [14]:
data = extract(link)
lines = transform(data)
cur = snowflake_connection()
load(cur, lines)

country - capital -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('country', 'capital')
Abkhazia - Sukhumi -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('Abkhazia', 'Sukhumi')
Afghanistan - Kabul -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('Afghanistan', 'Kabul')
Akrotiri and Dhekelia - Episkopi Cantonment -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('Akrotiri and Dhekelia', 'Episkopi Cantonment')
Albania - Tirana -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('Albania', 'Tirana')
Algeria - Algiers -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('Algeria', 'Algiers')
American Samoa - Pago Pago -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('American Samoa', 'Pago Pago')
Andorra - Andorra la Vella -> INSERT INTO dev.raw_data.country_capital (country, capital) VALUES ('Andorra', 'Andorra la Vella')
Angola - Luanda -> I

In [15]:
check_table_stats(cur, "dev.raw_data.country_capital", "country")

496
                 COUNTRY              CAPITAL
0               Abkhazia              Sukhumi
1               Abkhazia              Sukhumi
2            Afghanistan                Kabul
3            Afghanistan                Kabul
4  Akrotiri and Dhekelia  Episkopi Cantonment


Unnamed: 0,COUNTRY,CAPITAL
0,Abkhazia,Sukhumi
1,Abkhazia,Sukhumi
2,Afghanistan,Kabul
3,Afghanistan,Kabul
4,Akrotiri and Dhekelia,Episkopi Cantonment
...,...,...
491,Zambia,Lusaka
492,Zimbabwe,Harare
493,Zimbabwe,Harare
494,country,capital


In [16]:
# transform 함수를 개선해서 헤더를 빼고 리턴하게 만들자
def transform_v2(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:  # remove the first row
        (country, capital) = l.split(",")
        records.append([country, capital])
    return records[1:]

In [17]:
# load 함수를 개선해서 제대로 된 "Full refresh"를 구현하자
#  - 매번 테이블을 새로 만들고 레코드를 새로 적재하는 방식으로 구현
#  - SQL transaction을 사용하는 것이 일반적이지만 Snowflake은 SQL transaction을 지원하지
#    않기 때문에 다른 방식으로 구현
def load_v2(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        # 여기서 CREATE OR REPLACE TABLE이 사용되면 안됨. DDL이라 Snowflake에서는 transaction을 지키지 않기 때문
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar);")
        con.execute("BEGIN;")
        con.execute(f"DELETE FROM {target_table};")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            print(country, "-", capital)

            sql = f"INSERT INTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")
    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e

In [18]:
def country_capital_data_pipeline_v2(link):
    cur = snowflake_connection()
    data = extract(link)
    lines = transform_v2(data)
    load_v2(cur, lines)
    check_table_stats(cur, "dev.raw_data.country_capital", "country")
    cur.close()

In [19]:
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"
country_capital_data_pipeline_v2(link)

Abkhazia - Sukhumi
Afghanistan - Kabul
Akrotiri and Dhekelia - Episkopi Cantonment
Albania - Tirana
Algeria - Algiers
American Samoa - Pago Pago
Andorra - Andorra la Vella
Angola - Luanda
Anguilla - The Valley
Antigua and Barbuda - St. John''s
Argentina - Buenos Aires
Armenia - Yerevan
Aruba - Oranjestad
Ascension Island - Georgetown
Australia - Canberra
Austria - Vienna
Azerbaijan - Baku
Bahamas - Nassau
Bahrain - Manama
Bangladesh - Dhaka
Barbados - Bridgetown
Belarus - Minsk
Belgium - Brussels
Belize - Belmopan
Benin - Porto-Novo
Bermuda - Hamilton
Bhutan - Thimphu
Bolivia - La Paz
Bosnia and Herzegovina - Sarajevo
Botswana - Gaborone
Brazil - Brasââ lia
British Virgin Islands - Road Town
Brunei - Bandar Seri Begawan
Bulgaria - Sofia
Burkina Faso - Ouagadougou
Burundi - Bujumbura
Cambodia - Phnom Penh
Cameroon - YaoundâÂ©
Canada - Ottawa
Cape Verde - Praia
Cayman Islands - George Town
Central African Republic - Bangui
Chad - N''Djamena
Chile - Santiago
China - Beijing
Christmas

In [20]:
country_capital_data_pipeline_v2(link)

Abkhazia - Sukhumi
Afghanistan - Kabul
Akrotiri and Dhekelia - Episkopi Cantonment
Albania - Tirana
Algeria - Algiers
American Samoa - Pago Pago
Andorra - Andorra la Vella
Angola - Luanda
Anguilla - The Valley
Antigua and Barbuda - St. John''s
Argentina - Buenos Aires
Armenia - Yerevan
Aruba - Oranjestad
Ascension Island - Georgetown
Australia - Canberra
Austria - Vienna
Azerbaijan - Baku
Bahamas - Nassau
Bahrain - Manama
Bangladesh - Dhaka
Barbados - Bridgetown
Belarus - Minsk
Belgium - Brussels
Belize - Belmopan
Benin - Porto-Novo
Bermuda - Hamilton
Bhutan - Thimphu
Bolivia - La Paz
Bosnia and Herzegovina - Sarajevo
Botswana - Gaborone
Brazil - Brasââ lia
British Virgin Islands - Road Town
Brunei - Bandar Seri Begawan
Bulgaria - Sofia
Burkina Faso - Ouagadougou
Burundi - Bujumbura
Cambodia - Phnom Penh
Cameroon - YaoundâÂ©
Canada - Ottawa
Cape Verde - Praia
Cayman Islands - George Town
Central African Republic - Bangui
Chad - N''Djamena
Chile - Santiago
China - Beijing
Christmas

In [21]:
# 이번에는 Transaction이 제대로 동작하는지 load_v3를 만들고 고의로 SQL 문법 에러를 내보기
def load_v3(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        con.execute("BEGIN;")
        # 여기서 CREATE OR REPLACE TABLE이 사용되면 안됨. DDL이라 Snowflake에서는 transaction을 지키지 않기 때문
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar);")
        con.execute(f"DELETE FROM {target_table};")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            print(country, "-", capital)

            # 아래 고의로 에러를 냈음
            sql = f"INSERT INTTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")
    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e

In [22]:
load_v3(cur, lines)

country - capital
001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.


ProgrammingError: 001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.

In [23]:
# transaction이 동작하지 않았다면 앞서 DELETE가 실행되어서 테이블의 내용이 없어야함.
# 248개의 레코드가 있음을 확인해보자
cur.execute("SELECT COUNT(1) FROM dev.raw_data.country_capital")
results = cur.fetchone()
print(results)

(247,)
