In [1]:
import os
from dotenv import load_dotenv

user_id = os.getenv("snowflake_userid")
password = os.getenv("snowflake_password")
account = os.getenv("snowflake_account")


In [17]:
import requests
import snowflake.connector

def return_snowflake_conn():
    # Snowflake 연결 객체 생성
    conn = snowflake.connector.connect(
        user=user_id,
        password=password,
        account=account,
        warehouse = "compute_wh",
        database = "dev",
        
    )
    # Create a cursor object
    return conn.cursor()

def extract(url):
    f = requests.get(url)
    return (f.text)

def transform(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:
        (country, capital) = l.split(",")
        records.append([country, capital])
    
    return records[1:]

def check_table_stats(cur, table, key):
    result = cur.execute(f"SELECT * FROM {table} ORDER BY {key} ")
    df = cur.fetch_pandas_all()
    print(len(df))
    print(df.head())
    return df


In [18]:
def load(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar)")
        con.execute("BEGIN;")
        con.execute(f"DELETE FROM {target_table}")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            sql = f"INSERT INTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")

    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e
            

In [19]:
def country_capital_data_pipeline(link):
    cur = return_snowflake_conn()
    data = extract(link)
    lines = transform(data)
    load(cur, lines)
    check_table_stats(cur, "dev.raw_data.country_capital", "country")
    cur.close()    
    

In [20]:
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"

country_capital_data_pipeline(link)

247
                 COUNTRY              CAPITAL
0               Abkhazia              Sukhumi
1            Afghanistan                Kabul
2  Akrotiri and Dhekelia  Episkopi Cantonment
3                Albania               Tirana
4                Algeria              Algiers


In [21]:
country_capital_data_pipeline(link)

247
                 COUNTRY              CAPITAL
0               Abkhazia              Sukhumi
1            Afghanistan                Kabul
2  Akrotiri and Dhekelia  Episkopi Cantonment
3                Albania               Tirana
4                Algeria              Algiers


# Transaction Test

In [22]:
def load_v2 (con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        con.execute("BEGIN;")
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar)")
        con.execute(f"DELETE FROM {target_table}")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            # 고의 에러
            sql = f"INSERT INTTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")

    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e
        

In [27]:
cur = return_snowflake_conn()
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"

data = extract(link)
lines = transform(data)
load_v2(cur, lines)

001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.


ProgrammingError: 001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.

In [28]:
cur.execute("SELECT COUNT(1) FROM dev.raw_data.country_capital")
results = cur.fetchone()
print(results)






(247,)
