In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import pandas as pd
import psycopg2

In [None]:

# Define your DAG ID and execution schedule (customize as needed)
dag_id = 'alpha_vantage_to_postgresql'
schedule_interval = None  # Execute manually or specify a cron string

# Initialize the DAG
default_args = {
    'owner': 'yourname',
    'start_date': datetime(2023, 10, 26),
    'depends_on_past': False,
    'retries': 1,
}
dag = DAG(dag_id, schedule_interval=schedule_interval, default_args=default_args)

# Replace 'YOUR_API_KEY' with your Alpha Vantage API key
api_key = '.....'



In [None]:
# Define a function to download data from Alpha Vantage
def download_alpha_vantage_data():
    symbol_list = ["AAPL", "MSFT", "GOOGL"]  # Example: a list of stock symbols
    dataframes = []

    for symbol in symbol_list:
        api_url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&interval=5min&apikey={api_key}"
        response = requests.get(api_url)

        if response.status_code == 200:
            data = response.json()
            time_series_data = data.get('Time Series (Daily)')

            if time_series_data:
                df = pd.DataFrame.from_dict(time_series_data, orient='index')
                df['symbol'] = symbol  # Add a column for the symbol
                df['date'] = df.index  # Create a 'date' column from the date
                dataframes.append(df)
            else:
                print(f"No time series data for {symbol}")
        else:
            print(f"API Request for {symbol} Failed")

    if dataframes:
        combined_df = pd.concat(dataframes)
        combined_df['date'] = pd.to_datetime(combined_df['date'])  # Convert 'date' to datetime format
        sorted_df = combined_df.sort_values(by='date', ascending=True)  # Sort by date

        return sorted_df


In [None]:
# Define a function to insert data into PostgreSQL
def insert_to_postgresql(**kwargs):
    ti = kwargs['ti']
    sorted_df = ti.xcom_pull(task_ids='download_task')

    conn = psycopg2.connect(
        host='your_postgresql_host',
        database='your_database_name',
        user='your_username',
        password='your_password'
    )
    cursor = conn.cursor()

    for symbol in symbol_list:
        table_name = f"stock_data_{symbol}"

        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            date DATE PRIMARY KEY,
            open NUMERIC,
            high NUMERIC,
            low NUMERIC,
            close NUMERIC,
            adjusted_close NUMERIC,
            volume BIGINT
        )
        """
        cursor.execute(create_table_query)
        conn.commit()

        symbol_df = sorted_df[sorted_df['symbol'] == symbol]
        for _, row in symbol_df.iterrows():
            insert_query = f"""
            INSERT INTO {table_name} (date, open, high, low, close, adjusted_close, volume)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            """
            data = (
                row['date'].date(),
                row['1. open'],
                row['2. high'],
                row['3. low'],
                row['4. close'],
                row['5. adjusted close'],
                row['6. volume']
            )
            cursor.execute(insert_query, data)
            conn.commit()

    cursor.close()
    conn.close()

In [None]:
# Create separate tasks for downloading and inserting data
download_task = PythonOperator(
    task_id='download_task',
    python_callable=download_alpha_vantage_data,
    dag=dag,
)

insert_task = PythonOperator(
    task_id='insert_task',
    python_callable=insert_to_postgresql,
    provide_context=True,
    dag=dag,
)

In [None]:
# Execute the tasks manually
download_task.execute(context={})
insert_task.execute(context={})

In [None]:
### Execute tasks with airflow


# Set task dependencies
download_task >> insert_task

if __name__ == "__main__":
    dag.cli()