## Snowflake Data Load Notebook
This notebook creates the required Snowflake objects, stages a CSV from GitHub, and loads it into a table – all using **Snowpark for Python**.
**Prerequisites**
1. The `snowflake-snowpark-python` and `requests` packages are installed (see the next code cell).
2. Environment variables with your connection info are set in the kernel/session:
   * `SNOWFLAKE_ACCOUNT` –  Go to Account Details in snowflake . It should  look like bewlo 
   * `SNOWFLAKE_USER` 
   * `SNOWFLAKE_PASSWORD`

[connections.my_example_connection]
- account = "XXXX-XXXXX"
- user = "HUSEYN"
- role = "ACCOUNTADMIN"


In [289]:
import pathlib
import os, pathlib, requests
from snowflake.snowpark import Session


In [290]:

print("Working dir:", os.getcwd())
print("Files here:", os.listdir())


Working dir: c:\Users\ping\Documents\Bootcamps\Data-Analytics-Engineer-Bootcamp\dataflow\notebooks
Files here: ['netflix_titles.csv', 'snowpark_bootstrap.ipynb', 'snowpark_bootstrap_backup.ipynb']


In [291]:

from dotenv import load_dotenv
load_dotenv()

connection_parameters = {
    "account":   os.environ["SNOWFLAKE_ACCOUNT"],
    "user":      os.environ["SNOWFLAKE_USER"],
    "password":  os.environ["SNOWFLAKE_PASSWORD"],
    "role":      "ACCOUNTADMIN",  
    "warehouse": "COMPUTE_WH",        
}

session = Session.builder.configs(connection_parameters).create()
session.sql("SELECT CURRENT_VERSION() AS VERSION").show()

-------------
|"VERSION"  |
-------------
|9.12.1     |
-------------



In [292]:

session.sql("USE ROLE ACCOUNTADMIN").collect()
session.sql("DROP ROLE IF EXISTS DBT_ROLE").collect()
session.sql("CREATE ROLE IF NOT EXISTS DBT_ROLE").collect()






[Row(status='Role DBT_ROLE successfully created.')]

In [293]:

session.sql('''CREATE USER IF NOT EXISTS DBT_USER
  PASSWORD            = 'StrongPassword12345' 
  DEFAULT_ROLE        = DBT_ROLE
  DEFAULT_WAREHOUSE   = DBT_WH
  MUST_CHANGE_PASSWORD = FALSE;''').collect()

[Row(status='DBT_USER already exists, statement succeeded.')]

In [294]:
session.sql(''' GRANT ROLE DBT_ROLE TO ROLE DBT_USER;''')

<snowflake.snowpark.dataframe.DataFrame at 0x1de65958110>

In [295]:
session.sql(''' GRANT ROLE DBT_ROLE TO ROLE SYSADMIN;''')

session.sql('''USE ROLE DBT_ROLE;''')

<snowflake.snowpark.dataframe.DataFrame at 0x1de65916810>

In [296]:
session.sql('''
CREATE WAREHOUSE IF NOT EXISTS DBT_WH
  WITH WAREHOUSE_SIZE = 'XSMALL'
  AUTO_SUSPEND = 300
  AUTO_RESUME = TRUE
  INITIALLY_SUSPENDED = TRUE;
''').collect()



[Row(status='DBT_WH already exists, statement succeeded.')]

In [297]:
session.use_warehouse("DBT_WH")

In [298]:

def run_many(sql: str):
    for stmt in [s.strip() for s in sql.split(";") if s.strip()]:
        session.sql(stmt).collect()

In [299]:

run_many('''
CREATE OR REPLACE WAREHOUSE DBT_WH WAREHOUSE_SIZE = "XSMALL";

CREATE  OR REPLACE DATABASE  DBT_DB;
CREATE OR REPLACE SCHEMA    DBT_DB.DBT_SCHEMA;
      

CREATE OR REPLACE DATABASE  DBT_TARGET_DB;
CREATE OR REPLACE SCHEMA   DBT_TARGET_DB.DBT_TARGET_SCHEMA;

''')
print("Bootstrap complete.")

Bootstrap complete.


In [300]:

users = session.sql("SHOW USERS").collect()
print(users)


[Row(name='DBT_USER', created_on=datetime.datetime(2025, 5, 16, 12, 37, 50, 266000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), login_name='DBT_USER', display_name='DBT_USER', first_name=None, last_name=None, email=None, mins_to_unlock=None, days_to_expiry=None, comment=None, disabled='false', must_change_password='false', snowflake_lock='false', default_warehouse='DBT_WH', default_namespace=None, default_role='DBT_ROLE', default_secondary_roles='["ALL"]', ext_authn_duo='false', ext_authn_uid=None, mins_to_bypass_mfa=None, owner='ACCOUNTADMIN', last_success_login=datetime.datetime(2025, 5, 18, 4, 5, 4, 935000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), expires_at_time=None, locked_until_time=None, has_password='true', has_rsa_public_key='false', type=None, has_mfa='false'), Row(name='HUSEYN', created_on=datetime.datetime(2025, 5, 14, 12, 14, 54, 905000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), login_name='HUSEYN',

In [301]:
session.sql("CREATE OR REPLACE STAGE DBT_DB.DBT_SCHEMA.NETFLIX_RAW_STAGE").collect()


[Row(status='Stage area NETFLIX_RAW_STAGE successfully created.')]

In [302]:

csv_url   = "https://raw.githubusercontent.com/HuseynA28/DataFlow-Snowflake-Airflow-dbt-Docker-CICD-/refs/heads/main/data/netflix_titles.csv"
local_csv = pathlib.Path("netflix_titles.csv")
local_csv.write_bytes(requests.get(csv_url, timeout=30).content)


3399671

In [303]:

session.file.put(
    str(local_csv),                               
    "@DBT_DB.DBT_SCHEMA.NETFLIX_RAW_STAGE",       
    overwrite=True,
)

print("File uploaded to stage.")

File uploaded to stage.


In [304]:
session.sql('''USE ROLE DBT_ROLE''')

<snowflake.snowpark.dataframe.DataFrame at 0x1de6590b890>

In [305]:
session.sql(
  '''CREATE OR REPLACE FILE FORMAT DBT_DB.DBT_SCHEMA.CSV_NETFLIX_RAW
  TYPE                       = 'CSV'
  COMPRESSION                = 'AUTO'               
  FIELD_DELIMITER            = ','                  
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'              
  PARSE_HEADER = TRUE; ''').collect()


[Row(status='File format CSV_NETFLIX_RAW successfully created.')]

In [306]:
session.sql('''USE database  DBT_DB ''').collect()
session.sql('''USE schema  DBT_SCHEMA ''').collect()


[Row(status='Statement executed successfully.')]

In [307]:
print(session.sql("SELECT CURRENT_WAREHOUSE()").collect())

[Row(CURRENT_WAREHOUSE()='DBT_WH')]


In [308]:
session.sql(''' 
       
CREATE OR REPLACE TABLE  NETFLIX_DATA
  USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@DBT_DB.DBT_SCHEMA.NETFLIX_RAW_STAGE',
          FILE_FORMAT=>'CSV_NETFLIX_RAW'
        )
      ))''').collect()




[Row(status='Table NETFLIX_DATA successfully created.')]

In [309]:
session.sql(''' 


CREATE OR REPLACE FILE FORMAT DBT_DB.DBT_SCHEMA.csv_ff
  TYPE  = CSV
  FIELD_DELIMITER = ','
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  SKIP_HEADER = 1; 


   ''' ).collect()

[Row(status='File format CSV_FF successfully created.')]

In [310]:
session.sql(
    ''' 
  COPY INTO NETFLIX_DATA FROM @DBT_DB.DBT_SCHEMA.NETFLIX_RAW_STAGE
  FILE_FORMAT = (
    FORMAT_NAME= 'csv_ff'
  )         
  ''').collect()

[Row(file='netflix_raw_stage/netflix_titles.csv.gz', status='LOADED', rows_parsed=8807, rows_loaded=8807, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]