### Importing the Required Libraries to setup the connection from Local to Snowflake

In [12]:
%pip install snowflake-connector-python[pandas]

%pip install dotenv

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 26.0.1
[notice] To update, run: C:\Users\rampr\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 26.0.1
[notice] To update, run: C:\Users\rampr\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [13]:
import snowflake.connector
import os
from dotenv import load_dotenv

# Load the variables from .env
load_dotenv()

# Establish the connection
try:
    conn = snowflake.connector.connect(
        user=os.getenv('SNOWFLAKE_USER'),
        password=os.getenv('SNOWFLAKE_PASSWORD'),
        account=os.getenv('SNOWFLAKE_ACCOUNT'),
        warehouse=os.getenv('SNOWFLAKE_WAREHOUSE'),
        database=os.getenv('SNOWFLAKE_DATABASE'),
        role=os.getenv('SNOWFLAKE_ROLE')
    )
    print("Connection Successful")

    # Testing the connection
    cursor = conn.cursor()
    cursor.execute("SELECT CURRENT_VERSION()")
    print(f"Snowflake Version: {cursor.fetchone()[0]}")

except Exception as e:
    print(f"Connection Failed {e}")

"""finally:
    if 'conn' in locals():
        conn.close()"""

Connection Successful
Snowflake Version: 10.5.0


"finally:\n    if 'conn' in locals():\n        conn.close()"

### Loading the local files to Snowflake

In [14]:
local_csv_dir = "../../kaggle_csv_files/"

### Creating a stage to store the kaggle_csv_files
try:
    cursor.execute("USE WAREHOUSE OLIST_WH")
    cursor.execute("USE DATABASE OLIST")
    cursor.execute("USE SCHEMA RAW")
    cursor.execute("CREATE OR REPLACE STAGE olist_raw_stage;")
    print("Stage 'olist_raw_stage' created")

    ## Looping through the folder 
    for filename in os.listdir(local_csv_dir):
        if filename.endswith(".csv"):
            file_path = os.path.join(local_csv_dir, filename)
            print(f"Uploading file {filename}...")

            ## PUT sends the file to stage
            cursor.execute(f"PUT 'file://{file_path}' @olist_raw_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;")
    print("\nAll files successfully uploaded to the snowflake stage.")

    ## Verifying the uploaded files
    cursor.execute("LIST @olist_raw_stage;")
    print("Files currently in the stage:")
    for row in cursor.fetchall():
        print(f" - {row[0]}")

except Exception as e:
    print(f"Upload failed: {e}")

Stage 'olist_raw_stage' created
Uploading file olist_sellers_dataset.csv...
Uploading file olist_geolocation_dataset.csv...
Uploading file olist_products_dataset.csv...
Uploading file olist_order_items_dataset.csv...
Uploading file olist_orders_dataset.csv...
Uploading file olist_customers_dataset.csv...
Uploading file olist_order_reviews_dataset.csv...
Uploading file olist_order_payments_dataset.csv...
Uploading file product_category_name_translation.csv...

All files successfully uploaded to the snowflake stage.
Files currently in the stage:
 - olist_raw_stage/olist_customers_dataset.csv
 - olist_raw_stage/olist_geolocation_dataset.csv
 - olist_raw_stage/olist_order_items_dataset.csv
 - olist_raw_stage/olist_order_payments_dataset.csv
 - olist_raw_stage/olist_order_reviews_dataset.csv
 - olist_raw_stage/olist_orders_dataset.csv
 - olist_raw_stage/olist_products_dataset.csv
 - olist_raw_stage/olist_sellers_dataset.csv
 - olist_raw_stage/product_category_name_translation.csv


### Create tables for the csv files in the RAW schema

In [23]:
cursor.execute("""
                       CREATE OR REPLACE FILE FORMAT olist_csv_format
                       TYPE = 'CSV'
                       FIELD_DELIMITER = ','
                       PARSE_HEADER = TRUE 
                       FIELD_OPTIONALLY_ENCLOSED_BY = '"'
                       """)

<snowflake.connector.cursor.SnowflakeCursor at 0x1b39c9b20d0>

In [24]:
import os

## Get the list of the files in the stage
cursor.execute("LIST @olist_raw_stage;")
files_in_stage = cursor.fetchall()

for row in files_in_stage:
    ## get the internal path
    full_path = row[0]
    filename = full_path.split('/')[-1] #get the filename

    ## Derive the table name from the file
    table_name = filename.replace('olist_', '').replace('_dataset','').split('.')[0].upper()

    print(f"Processing {filename} -> Table: RAW.{table_name}...")

    try:
        
        
        #Use template and Infer-Schema to build the table
        cursor.execute(f"""
                        CREATE OR REPLACE TABLE RAW.{table_name}
                        USING TEMPLATE (
                        SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
                        FROM TABLE (
                            INFER_SCHEMA(
                            LOCATION => '@olist_raw_stage/{filename}',
                            FILE_FORMAT => 'olist_csv_format'
                            )
                            )
                            )
                         """)
        #4. Copy the data from Stage to the new Table
        cursor.execute(f"""
                       COPY INTO RAW.{table_name} FROM @olist_raw_stage/{filename} 
                       FILE_FORMAT = (FORMAT_NAME = 'olist_csv_format')
                       MATCH_BY_COLUMN_NAME = CASE_SENSITIVE
                       PURGE = FALSE 
                    """)
        print(f"Confirmation: RAW.{table_name} loaded successfully.")

    except Exception as e:
        print(f"Failed to load {table_name}: {e}")

print("\n--- All ingestion tasks complete ---")





Processing olist_customers_dataset.csv -> Table: RAW.CUSTOMERS...
Confirmation: RAW.CUSTOMERS loaded successfully.
Processing olist_geolocation_dataset.csv -> Table: RAW.GEOLOCATION...
Confirmation: RAW.GEOLOCATION loaded successfully.
Processing olist_order_items_dataset.csv -> Table: RAW.ORDER_ITEMS...
Confirmation: RAW.ORDER_ITEMS loaded successfully.
Processing olist_order_payments_dataset.csv -> Table: RAW.ORDER_PAYMENTS...
Confirmation: RAW.ORDER_PAYMENTS loaded successfully.
Processing olist_order_reviews_dataset.csv -> Table: RAW.ORDER_REVIEWS...
Confirmation: RAW.ORDER_REVIEWS loaded successfully.
Processing olist_orders_dataset.csv -> Table: RAW.ORDERS...
Confirmation: RAW.ORDERS loaded successfully.
Processing olist_products_dataset.csv -> Table: RAW.PRODUCTS...
Confirmation: RAW.PRODUCTS loaded successfully.
Processing olist_sellers_dataset.csv -> Table: RAW.SELLERS...
Confirmation: RAW.SELLERS loaded successfully.
Processing product_category_name_translation.csv -> Table: 