<a href="https://colab.research.google.com/github/ShindeShwetaK/dev/blob/main/Data_Pipeline_Example_from_Google_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Configure Snowflake Python Client

In [None]:
from google.colab import userdata

user_id = userdata.get('snowflake_userid')
password = userdata.get('snowflake_password')
account = userdata.get('snowflake_account')

In [None]:
!pip install snowflake-connector-python

Collecting snowflake-connector-python
  Downloading snowflake_connector_python-3.12.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (65 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/65.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.3/65.3 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting tomlkit (from snowflake-connector-python)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Downloading snowflake_connector_python-3.12.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m29.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import snowflake.connector

def return_snowflake_conn():
    # Establish a connection to Snowflake
    conn = snowflake.connector.connect(
        user=user_id,
        password=password,
        account=account,  # Example: 'xyz12345.us-east-1'
        warehouse='compute_wh',
        database='dev'
    )
    # Create a cursor object
    return conn.cursor()

## 1st version: define ETL functions

In [None]:
import requests

def extract(url):
    f = requests.get(url)
    return (f.text)

In [None]:
def transform(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:
      (country, capital) = l.split(",")
      records.append([country, capital])
    # return the records except the first entry
    return records

In [None]:
def load(con, records):
    # full refresh
    target_table = "dev.raw_data.country_capital"
    con.execute(f"""
    CREATE TABLE IF NOT EXISTS {target_table} (
      country varchar primary key,
      capital varchar
    )""")
    # load records
    for r in records: # we want records except the first one
        country = r[0].replace("'", "''")
        capital = r[1].replace("'", "''")
        print(country, "-", capital)
        # use parameterized INSERT INTO to handle some special characters such as '
        sql = f"INSERT INTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
        # print(sql)
        con.execute(sql)

## 이제 Extract부터 함수를 하나씩 실행

In [None]:
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"

data = extract(link)

In [None]:
data

"country,capital\nAbkhazia,Sukhumi\nAfghanistan,Kabul\nAkrotiri and Dhekelia,Episkopi Cantonment\nAlbania,Tirana\nAlgeria,Algiers\nAmerican Samoa,Pago Pago\nAndorra,Andorra la Vella\nAngola,Luanda\nAnguilla,The Valley\nAntigua and Barbuda,St. John's\nArgentina,Buenos Aires\nArmenia,Yerevan\nAruba,Oranjestad\nAscension Island,Georgetown\nAustralia,Canberra\nAustria,Vienna\nAzerbaijan,Baku\nBahamas,Nassau\nBahrain,Manama\nBangladesh,Dhaka\nBarbados,Bridgetown\nBelarus,Minsk\nBelgium,Brussels\nBelize,Belmopan\nBenin,Porto-Novo\nBermuda,Hamilton\nBhutan,Thimphu\nBolivia,Sucre\nBolivia,La Paz\nBosnia and Herzegovina,Sarajevo\nBotswana,Gaborone\nBrazil,Brasâ\x88\x9aâ\x89\xa0lia\nBritish Virgin Islands,Road Town\nBrunei,Bandar Seri Begawan\nBulgaria,Sofia\nBurkina Faso,Ouagadougou\nBurundi,Bujumbura\nCambodia,Phnom Penh\nCameroon,Yaoundâ\x88\x9aÂ©\nCanada,Ottawa\nCape Verde,Praia\nCayman Islands,George Town\nCentral African Republic,Bangui\nChad,N'Djamena\nChile,Santiago\nChina,Beijing\nChris

In [None]:
lines = transform(data)

In [None]:
len(lines)

249

In [None]:
lines[0:10]

[['country', 'capital'],
 ['Abkhazia', 'Sukhumi'],
 ['Afghanistan', 'Kabul'],
 ['Akrotiri and Dhekelia', 'Episkopi Cantonment'],
 ['Albania', 'Tirana'],
 ['Algeria', 'Algiers'],
 ['American Samoa', 'Pago Pago'],
 ['Andorra', 'Andorra la Vella'],
 ['Angola', 'Luanda'],
 ['Anguilla', 'The Valley']]

In [None]:
con = return_snowflake_conn()
load(con, lines)

country - capital
Abkhazia - Sukhumi
Afghanistan - Kabul
Akrotiri and Dhekelia - Episkopi Cantonment
Albania - Tirana
Algeria - Algiers
American Samoa - Pago Pago
Andorra - Andorra la Vella
Angola - Luanda
Anguilla - The Valley
Antigua and Barbuda - St. John''s
Argentina - Buenos Aires
Armenia - Yerevan
Aruba - Oranjestad
Ascension Island - Georgetown
Australia - Canberra
Austria - Vienna
Azerbaijan - Baku
Bahamas - Nassau
Bahrain - Manama
Bangladesh - Dhaka
Barbados - Bridgetown
Belarus - Minsk
Belgium - Brussels
Belize - Belmopan
Benin - Porto-Novo
Bermuda - Hamilton
Bhutan - Thimphu
Bolivia - Sucre
Bolivia - La Paz
Bosnia and Herzegovina - Sarajevo
Botswana - Gaborone
Brazil - Brasââ lia
British Virgin Islands - Road Town
Brunei - Bandar Seri Begawan
Bulgaria - Sofia
Burkina Faso - Ouagadougou
Burundi - Bujumbura
Cambodia - Phnom Penh
Cameroon - YaoundâÂ©
Canada - Ottawa
Cape Verde - Praia
Cayman Islands - George Town
Central African Republic - Bangui
Chad - N''Djamena
Chile - 

In [None]:
def check_table_stats(table):
    result = con.execute(f"SELECT * FROM {table}")
    df = con.fetch_pandas_all()
    print(df.head())
    print(len(df))

In [None]:
check_table_stats("dev.raw_data.country_capital")

                 COUNTRY              CAPITAL
0               Abkhazia              Sukhumi
1            Afghanistan                Kabul
2  Akrotiri and Dhekelia  Episkopi Cantonment
3                Albania               Tirana
4                Algeria              Algiers
497


## run again to check if idempotency is ensured.

In [None]:
data = extract(link)
lines = transform(data)
con = return_snowflake_conn()
load(con, lines)
check_table_stats("dev.raw_data.country_capital")

## Rerun to see if the Idempotency is maintained.

## Let's improve the codes to maintain the idempotency of the pipeline

In [None]:
# First remove the header
def transform_v2(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:  # remove the first row
        (country, capital) = l.split(",")
        records.append([country, capital])
    return records[1:]

In [None]:
# Full refresh
# Drop the table first before updating
# Use transaction to ensure data integrity
def load_v2(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        con.execute("BEGIN;")
        con.execute(f"CREATE OR REPLACE TABLE {target_table} (country varchar primary key, capital varchar);")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            print(country, "-", capital)

            sql = f"INSERT INTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")
    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e

In [None]:
def country_capital_data_pipeline_v2(link):
    con = return_snowflake_conn()
    data = extract(link)
    lines = transform_v2(data)
    load_v2(con, lines)
    check_table_stats("dev.raw_data.country_capital")
    con.close()

In [None]:
country_capital_data_pipeline_v2(link)

In [None]:
country_capital_data_pipeline_v2(link)

## Now let's implement incremental update using MERGE SQL

In [None]:
# Implement incremental update using MERGE SQL in Snowflake
def load_v3(con, records):
    staging_table = "dev.raw_data.temp_country_capital"
    target_table = "dev.raw_data.country_capital"
    try:
        con.execute(f"""
          -- CREATE TABLE IF NOT EXISTS {target_table} (
          CREATE OR REPLACE TABLE {target_table} (
            country varchar primary key, capital varchar
          );""")
        con.execute(f"""
          CREATE OR REPLACE TABLE {staging_table} (
            country varchar primary key, capital varchar
          );""")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            sql = f"INSERT INTO {staging_table} (country, capital) VALUES ('{country}', '{capital}');"
            print(sql)
            con.execute(sql)
        # perform UPSERT
        upsert_sql = f"""
            -- Performing the UPSERT operation
            MERGE INTO {target_table} AS target
            USING {staging_table} AS stage
            ON target.country = stage.country
            WHEN MATCHED THEN
                UPDATE SET
                    target.country = stage.country,
                    target.capital = stage.capital
            WHEN NOT MATCHED THEN
                INSERT (country, capital)
                VALUES (stage.country, stage.capital);
        """
        con.execute(upsert_sql)
    except Exception as e:
        print(e)
        raise e

In [None]:
def country_capital_data_pipeline_inc(link):
    con = return_snowflake_conn()
    data = extract(link)
    lines = transform_v2(data)
    load_v3(con, lines)
    check_table_stats("dev.raw_data.country_capital")
    con.close()

In [None]:
country_capital_data_pipeline_inc("https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv")