Insert Customers,Transactions and dates CSV Data after Normalisation of Coloumn index names in Pandas to Snowflake from S3, based on the filename PRefix , and Using IAM role for the access to read data from s3

In [None]:
export SNOWFLAKE_USER="your_user"
export SNOWFLAKE_PASSWORD="your_password"
export SNOWFLAKE_ACCOUNT="your_account"
export SNOWFLAKE_WAREHOUSE="your_warehouse"
export SNOWFLAKE_DATABASE="your_database"
export SNOWFLAKE_SCHEMA="your_schema"
export AWS_ACCESS_KEY_ID="your_aws_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_aws_secret_access_key"
export AWS_SESSION_TOKEN="your_aws_session_token"
export AWS_BUCKET="your_aws_bucket"



In [None]:
import os
import pandas as pd
import boto3
import snowflake.connector
from io import StringIO

# -----------------------------
# Load sensitive config from environment variables
# -----------------------------
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
AWS_BUCKET = os.getenv("AWS_BUCKET")
ROLE_ARN = os.getenv("AWS_ROLE_ARN")  # IAM role ARN for S3 access

def snowflake_conn():
    return snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        account=SNOWFLAKE_ACCOUNT,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SNOWFLAKE_SCHEMA
    )

# -----------------------------
# S3 client using IAM role
# -----------------------------
sts_client = boto3.client("sts")
assumed_role = sts_client.assume_role(
    RoleArn=ROLE_ARN,
    RoleSessionName="SnowflakeS3Session"
)
credentials = assumed_role['Credentials']

s3_client = boto3.client(
    "s3",
    aws_access_key_id=credentials['AccessKeyId'],
    aws_secret_access_key=credentials['SecretAccessKey'],
    aws_session_token=credentials['SessionToken']
)

# -----------------------------
# Column templates & variant mapping
# -----------------------------
CUSTOMERS_COLS_STANDARD = [
    "FIRST_NAME","LAST_NAME","COMPANY_NAME","ADDRESS","CITY","COUNTY",
    "STATE","POSTAL","PHONE1","PHONE2","EMAIL","WEB"
]
CUSTOMERS_VARIANTS = {"PROVINCE":"STATE","ZIP":"POSTAL","POST":"POSTAL"}

TRANSACTIONS_COLS_STANDARD = [
    "ORDERNUMBER","QUANTITYORDERED","ORDERLINENUMBER","TOTAL_AMOUNT","ORDERDATE",
    "QTR_ID","MONTH_ID","YEAR_ID","PRODUCTCODE","CUSTOMERNAME","PHONE",
    "ADDRESSLINE1","ADDRESSLINE2","CITY","STATE","POSTALCODE","COUNTRY",
    "TERRITORY","CONTACTLASTNAME","CONTACTFIRSTNAME","DEALSIZE"
]
TRANSACTIONS_VARIANTS = {"POSTAL":"POSTALCODE"}

DATES_COLS_STANDARD = [
    "CALENDAR_DATE","WEEKDAY_NUMBER","WEEKDAY_NAME","FISCAL_WEEK_OF_MONTH",
    "FISCAL_WEEK_OF_YEAR","FISCAL_MONTH_NUMBER","FISCAL_MONTH_NAME",
    "FISCAL_FIRST_DAY_OF_WEEK","FISCAL_LAST_DAY_OF_WEEK",
    "FISCAL_FIRST_DAY_OF_MONTH","FISCAL_LAST_DAY_OF_MONTH","FISCAL_DAY_OF_MONTH",
    "FISCAL_QUARTER","FISCAL_YEAR","FISCAL_YEAR_WEEK","FISCAL_YEAR_MONTH",
    "FISCAL_YEAR_QUARTER","CALENDAR_WEEK_OF_MONTH","CALENDAR_WEEK_OF_YEAR",
    "CALENDAR_FIRST_DAY_OF_MONTH","CALENDAR_LAST_DAY_OF_MONTH",
    "CALENDAR_FIRST_DAY_OF_YEAR","CALENDAR_LAST_DAY_OF_YEAR","WEEKEND"
]
DATES_VARIANTS = {}

# -----------------------------
# Helper functions (clean, dedupe, merge)
# -----------------------------
def clean_string(val):
    if pd.isna(val):
        return None
    val = str(val).strip()
    if val.upper() == "NULL":
        return None
    return val

def clean_dataframe(df, standard_cols, variants):
    df.columns = [col.strip().upper().replace('"','') for col in df.columns]
    for src, target in variants.items():
        if src in df.columns and target not in df.columns:
            df.rename(columns={src: target}, inplace=True)
    for col in df.columns:
        df[col] = df[col].apply(clean_string)
    for col in standard_cols:
        if col not in df.columns:
            df[col] = None
    df = df[standard_cols]
    return df

def upsert_df_to_snowflake(cur, df, table_name):
    temp_table = table_name + "_STG"
    cur.execute(f"CREATE OR REPLACE TEMPORARY TABLE {temp_table} AS SELECT * FROM {table_name} WHERE 1=0")
    col_list = ", ".join([f'"{c}"' for c in df.columns])
    for _, row in df.iterrows():
        vals = [f"'{str(v).replace('\'','\'\'')}'" if v is not None else "NULL" for v in row]
        cur.execute(f"INSERT INTO {temp_table} ({col_list}) VALUES ({', '.join(vals)})")
    key_cols = df.columns.tolist()[:1]  # first column as primary key for dedupe
    key_str = " AND ".join([f"t.{k}=s.{k}" for k in key_cols])
    cur.execute(f"""
        MERGE INTO {table_name} t
        USING {temp_table} s
        ON {key_str}
        WHEN NOT MATCHED THEN INSERT ({col_list}) VALUES ({', '.join(['s."'+c+'"' for c in df.columns])})
    """)

def load_table(prefix, standard_cols, variants, table_name):
    conn = snowflake_conn()
    cur = conn.cursor()
    try:
        cur.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join([f'\"{c}\" STRING' for c in standard_cols])})")
        objs = s3_client.list_objects_v2(Bucket=AWS_BUCKET, Prefix=prefix)
        if "Contents" not in objs:
            print(f"⚠️ No files found for prefix {prefix}")
            return
        all_data = []
        for obj in objs["Contents"]:
            key = obj["Key"]
            if not key.lower().endswith(".csv"):
                continue
            csv_obj = s3_client.get_object(Bucket=AWS_BUCKET, Key=key)
            df = pd.read_csv(csv_obj['Body'], dtype=str, on_bad_lines='skip')
            df = clean_dataframe(df, standard_cols, variants)
            all_data.append(df)
        if all_data:
            df_all = pd.concat(all_data, ignore_index=True).drop_duplicates()
            upsert_df_to_snowflake(cur, df_all, table_name)
        cur.execute(f"SELECT COUNT(*) FROM {table_name}")
        print(f"📦 Final row count in {table_name}: {cur.fetchone()[0]}")
    finally:
        cur.close()
        conn.close()

# -----------------------------
# Main loaders
# -----------------------------
if __name__ == "__main__":
    load_table("de_shop_customers", CUSTOMERS_COLS_STANDARD, CUSTOMERS_VARIANTS, "CUSTOMERS")
    load_table("de_shop_transactions", TRANSACTIONS_COLS_STANDARD, TRANSACTIONS_VARIANTS, "TRANSACTIONS")
    load_table("de_dates", DATES_COLS_STANDARD, DATES_VARIANTS, "DATES")


In [None]:
import pandas as pd
import snowflake.connector

# -----------------------------
# Snowflake connection
# -----------------------------
tables = ["CUSTOMERS", "TRANSACTIONS", "DATES"]

for table in tables:
    # Fetch top 2 rows
    df_top = pd.read_sql(f"SELECT * FROM {table} LIMIT 2", conn)
    print(f"\n📌 Table: {table}")
    print(df_top)
    
    # Fetch row count and column count
    row_count = pd.read_sql(f"SELECT COUNT(*) AS ROW_COUNT FROM {table}", conn).iloc[0,0]
    col_count = len(df_top.columns)
    print(f"Rows: {row_count}, Columns: {col_count}")

conn.close()


  ) -> int | None:
  ) -> int | None:



📌 Table: CUSTOMERS
  FIRST_NAME   LAST_NAME             COMPANY_NAME       ADDRESS  \
0    Aleshia  Tomkiewicz  Alan D Rosenburg Cpa Pc  14 Taylor St   
1       Evan   Zigomalas       Cap Gemini America   5 Binney St   

                CITY           COUNTY STATE    POSTAL        PHONE1  \
0  St. Stephens Ward             Kent  None   CT2 7PP  01835-703597   
1         Abbey Ward  Buckinghamshire  None  HP11 2AX  01937-864715   

         PHONE2                     EMAIL  \
0  01944-369967   atomkiewicz@hotmail.com   
1  01714-737668  evan.zigomalas@gmail.com   

                                    WEB  
0  http://www.alandrosenburgcpapc.co.uk  
1     http://www.capgeminiamerica.co.uk  
Rows: 2000, Columns: 12

📌 Table: TRANSACTIONS
  ORDERNUMBER QUANTITYORDERED ORDERLINENUMBER TOTAL_AMOUNT       ORDERDATE  \
0       10107              30               2        -1993  2/24/2003 0:00   
1       10121              34               5          -92   5/7/2003 0:00   

  QTR_ID MONTH_ID YE

  ) -> int | None:
  ) -> int | None:
  ) -> int | None:


Rows: 2823, Columns: 23

📌 Table: DATES
  CALENDAR_DATE WEEKDAY_NUMBER WEEKDAY_NAME FISCAL_WEEK_OF_MONTH  \
0    2010-01-31              1          Sun                    1   
1    2010-02-01              2          Mon                    1   

  FISCAL_WEEK_OF_YEAR FISCAL_MONTH_NUMBER FISCAL_MONTH_NAME  \
0                   1                   1               Feb   
1                   1                   1               Feb   

  FISCAL_FIRST_DAY_OF_WEEK FISCAL_LAST_DAY_OF_WEEK FISCAL_FIRST_DAY_OF_MONTH  \
0               2010-01-31              2010-02-06                2010-01-31   
1               2010-01-31              2010-02-06                2010-01-31   

   ... FISCAL_YEAR_WEEK FISCAL_YEAR_MONTH FISCAL_YEAR_QUARTER  \
0  ...           201001            201001               20101   
1  ...           201001            201001               20101   

  CALENDAR_WEEK_OF_MONTH CALENDAR_WEEK_OF_YEAR CALENDAR_FIRST_DAY_OF_MONTH  \
0                      5                     4    

  ) -> int | None:
