In [1]:
from google.colab import userdata

user_id = userdata.get('snowflake_userid')
password = userdata.get('snowflake_password')
account = userdata.get('snowflake_account')

In [2]:
!pip install snowflake-connector-python

Collecting snowflake-connector-python
  Downloading snowflake_connector_python-3.14.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (67 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/67.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━[0m [32m41.0/67.8 kB[0m [31m1.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.8/67.8 kB[0m [31m943.6 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting tomlkit (from snowflake-connector-python)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Downloading snowflake_connector_python-3.14.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m17.3 M

In [2]:
import snowflake.connector

def return_snowflake_conn():
    # Establish a connection to Snowflake
    conn = snowflake.connector.connect(
        user=user_id,
        password=password,
        account=account,  # Example: 'xyz12345.us-east-1'
        warehouse='compute_wh',
        database='DEV',
        schema='raw'
    )
    # Create a cursor object
    return conn.cursor()

In [4]:
pip install apache-airflow apache-airflow-providers-snowflake requests python-dotenv pandas

Collecting apache-airflow
  Downloading apache_airflow-2.10.5-py3-none-any.whl.metadata (45 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/45.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.4/45.4 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting apache-airflow-providers-snowflake
  Downloading apache_airflow_providers_snowflake-6.1.0-py3-none-any.whl.metadata (6.2 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Downloading alembic-1.15.1-py3-none-any.whl.metadata (7.2 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.6.0-py3-none-any.whl.metadata (16 kB)
Collecting asgiref>=2.3.0 (from apache-airflow)
  Downloading asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting colorlog>=6.8.2 (from apache-airflow)
  Downloading colorlog-6.9.0-py3-none-an

In [3]:
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from datetime import datetime, timedelta
import requests
import pandas as pd

@dag(schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False)
def stock_price_etl():

    @task()
    def fetch_data():
        api_key = Variable.get("ALPHA_VANTAGE_API_KEY")
        symbol = "AAPL"
        url = f"https://www.alphavantage.co/query"
        params = {
            "function": "TIME_SERIES_DAILY",
            "symbol": symbol,
            "apikey": api_key
        }
        response = requests.get(url, params=params)
        data = response.json()

        prices = []
        for date, values in data["Time Series (Daily)"].items():
            prices.append({
                "symbol": symbol,
                "date": date,
                "open": values["1. open"],
                "high": values["2. high"],
                "low": values["3. low"],
                "close": values["4. close"],
                "volume": values["5. volume"]
            })
        return prices

    @task()
    def load_data(data):
        hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
        sql = "INSERT INTO raw.stock_prices (symbol, date, open, close, high, low, volume) VALUES (%s, %s, %s, %s, %s, %s, %s)"
        hook.run(sql, parameters=data)

    data = fetch_data()
    load_data(data)

etl_dag = stock_price_etl()