In [None]:
import snowflake.connector
import configparser

config = configparser.ConfigParser()
config.read('configuration.properties')

def snowflake_connection():
    try:
        user = config['SNOWFLAKE']['user']
        password = config['SNOWFLAKE']['password']
        account = config['SNOWFLAKE']['account']
        role = config['SNOWFLAKE']['role']
        warehouse = config['SNOWFLAKE']['warehouse']
        database = config['SNOWFLAKE']['database']
        schema = config['SNOWFLAKE']['schema']
        table = config['SNOWFLAKE']['table']
        stage = config['SNOWFLAKE']['stage']
        file_format = config['SNOWFLAKE']['file_format']

        conn = snowflake.connector.connect(
            user=user,
            password=password,
            account=account,
            warehouse=warehouse,
            database=database,
            schema=schema,
            role=role
        )

        return conn, table, stage, file_format
    except Exception as e:
        print("Exception in snowflake_connection function: ",e)
        return  "", ""

In [6]:
conn, table, stage, file_format = snowflake_connection()
csv_name = 'notice_rule_2025_02_25.csv'

In [14]:
def stage_csv(csv_name):
    try:
        cursor = conn.cursor()
        put_query = f"PUT file://{csv_name} @{stage} AUTO_COMPRESS=TRUE OVERWRITE=TRUE"
        cursor.execute(put_query)
        print(f"Staging data...")
        print(f"{csv_name} staged successfully")
        cursor.close()
    except Exception as e:
        print("Exception in stage_csv function: ",e)

In [12]:
def load_to_snowflake(csv_name):
    try:
        conn, table, stage, file_format = snowflake_connection()
        cur = conn.cursor()

        # Query to update the record if job with same job_id is present. Insert only when job_id is not present
        merge_query = f'''MERGE INTO {table} t
                            USING (SELECT   $1 ID, 
                                            $2 AGENCY, 
                                            $3 ACTION_TYPE, 
                                            $4 SUMMARY, 
                                            $5 IMPORTANT_DATES, 
                                            $6 PUBLIC_INSPECTION_PDF_URL, 
                                            $7 LAST_UPDATED_DATE, 
                                            $8 PUBLICATION_DATE from @{stage}/{csv_name} (FILE_FORMAT => {file_format})) s
                            ON t.ID = s.ID 
                            WHEN MATCHED THEN
                                UPDATE SET  t.ID = s.ID,
                                            t.AGENCY = s.AGENCY,
                                            t.ACTION_TYPE = s.ACTION_TYPE,
                                            t.SUMMARY = s.SUMMARY,
                                            t.IMPORTANT_DATES = s.IMPORTANT_DATES,
                                            t.PUBLIC_INSPECTION_PDF_URL = s.PUBLIC_INSPECTION_PDF_URL,
                                            t.LAST_UPDATED_DATE = s.LAST_UPDATED_DATE,
                                            t.PUBLICATION_DATE = s.PUBLICATION_DATE
                            WHEN NOT MATCHED THEN
                            INSERT (ID, AGENCY, ACTION_TYPE, SUMMARY, IMPORTANT_DATES, PUBLIC_INSPECTION_PDF_URL, LAST_UPDATED_DATE, PUBLICATION_DATE, DI_LOAD_DT)
                            VALUES (s.ID, s.AGENCY, s.ACTION_TYPE, s.SUMMARY, s.IMPORTANT_DATES, s.PUBLIC_INSPECTION_PDF_URL, s.LAST_UPDATED_DATE, s.PUBLICATION_DATE, CURRENT_TIMESTAMP);
                            '''

        print("merging data...")

        # executing load
        cur.execute(merge_query)

        print("Data merged successfully")

        # Close the cursor and connection
        cur.close()
        conn.close()

    except Exception as e:
        print("Exception in load_to_snowflake function: ", e)


In [15]:
stage_csv(csv_name)
load_to_snowflake(csv_name)

Staging data...
notice_rule_2025_02_25.csv staged successfully
merging data...
Data merged successfully
