In [10]:
import os
import time
import requests
import pandas as pd
from dotenv import load_dotenv
from log import log_message
import logging

### EXTRACT

In [7]:
# Load API key from .env file
load_dotenv()

API_KEY = os.getenv("API_KEY")

# API URL
API_URL = "https://www.alphavantage.co/query"

# List of symbols to process
SYMBOLS = {
    "AAPL": "Apple Inc.",
    "MSFT": "Microsoft Corporation",
    "GOOG": "Alphabet Inc.",
    "AMZN": "Amazon.com Inc.",
    "NFLX": "Netflix Inc."
}

def fetch_stock_data(symbol):
    """
    Fetch daily stock data for a specific symbol from Alpha Vantage.

    Args:
        symbol (str): The stock symbol.

    Returns:
        pd.DataFrame: DataFrame with stock data, or None on failure.
    """
    log_message("info", f"Fetching data for symbol: {symbol}")

    params = {
        "function": "TIME_SERIES_DAILY",
        "symbol": symbol,
        "apikey": API_KEY,
        "datatype": "json",
        "outputsize": "compact"
    }

    try:
        response = requests.get(API_URL, params=params)
        response.raise_for_status()
        data = response.json()

        # Check for valid data format
        if "Time Series (Daily)" not in data:
            log_message("error", f"Invalid data format for {symbol}: {data}")
            return None

        # Parse into DataFrame
        time_series = data["Time Series (Daily)"]
        df = pd.DataFrame.from_dict(time_series, orient="index").reset_index()
        
        df.rename(columns={
            "index": "Date",
            "1. open": "OpenPrice",
            "2. high": "High",
            "3. low": "Low",
            "4. close": "ClosePrice",
            "5. volume": "Volume"
        }, inplace=True)

        df["Symbol"] = symbol
        return df

    except Exception as e:
        log_message("error", f"Error fetching data for {symbol}: {e}")
        return None

def parse_stock_data(symbol, stock_data):
    """
    Parse the JSON data into a structured DataFrame.

    Args:
        symbol (str): The stock symbol.
        stock_data (dict): Raw JSON data.

    Returns:
        pd.DataFrame: Parsed DataFrame.
    """
    try:
        company_name = SYMBOLS[symbol]
        records = []
        for date, metrics in stock_data.items():
            records.append({
                "Symbol": symbol,
                "CompanyName": company_name,
                "Date": date,
                "OpenPrice": float(metrics.get("1. open", 0)),
                "High": float(metrics.get("2. high", 0)),
                "Low": float(metrics.get("3. low", 0)),
                "ClosePrice": float(metrics.get("4. close", 0)),
                "Volume": int(metrics.get("6. volume", 0))
            })

        df = pd.DataFrame(records)
        return df

    except Exception as e:
        log_message("error", f"Error parsing data for {symbol}: {e}")
        return pd.DataFrame()

def main():
    """
    Main function to fetch stock data for multiple symbols and save results.
    """
    log_message("info", "Starting data extraction process")
    
    all_data = []  # To store data for all symbols
    total_rows = 0  # Counter for total rows processed

    for symbol in SYMBOLS.keys():
        df = fetch_stock_data(symbol)

        if df is not None:
            row_count = len(df)
            log_message("info", f"Extracted {row_count} rows for {symbol}")
            total_rows += row_count
            all_data.append(df)

        log_message("info", "Waiting 12 seconds to respect API rate limits")
        time.sleep(12)

    if all_data:
        combined_df = pd.concat(all_data, ignore_index=True)
        combined_df.drop_duplicates(subset=["Symbol", "Date"], inplace=True)
        log_message("info", f"Total rows extracted: {total_rows}")

        combined_df.to_csv("stock_data.csv", index=False)
        log_message("info", "Saved data to stock_data.csv")
    else:
        log_message("error", "No data was fetched for any symbols")

### Transform

In [None]:
import os
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from log import log_message

# Load environment variables
load_dotenv()

# Directory containing the dataset
DATASET_DIR = "dataset"
INPUT_FILE_NAME = "extracted_stock_data.csv"  # Replace with the name of your raw dataset file
OUTPUT_FILE_NAME = "transformed_data.csv"

def load_existing_data(file_name):
    """
    Load existing stock data from the dataset directory.

    Args:
        file_name (str): The name of the file to load.

    Returns:
        pd.DataFrame: DataFrame with loaded data, or None if the file doesn't exist.
    """
    file_path = os.path.join(DATASET_DIR, file_name)

    if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
        log_message("error", f"File {file_path} is missing or empty.")
        return None

    try:
        df = pd.read_csv(file_path)
        log_message("info", f"Loaded data from {file_path}")
        return df
    except Exception as e:
        log_message("error", f"Error loading data from {file_path}: {e}")
        return None

def transform_data(df):
    """
    Transform the stock data to a standardized format.

    Args:
        df (pd.DataFrame): DataFrame with raw stock data.

    Returns:
        pd.DataFrame: Transformed DataFrame.
    """
    try:
        # Ensure required columns are present
        expected_columns = ["Date", "OpenPrice", "High", "Low", "ClosePrice", "Volume", "Symbol"]
        if not all(col in df.columns for col in expected_columns):
            log_message("error", f"Dataset missing required columns. Found: {df.columns}")
            return pd.DataFrame()

        # Standardize column data types
        df["Date"] = pd.to_datetime(df["Date"]).dt.strftime("%Y-%m-%d")  # Standardize date format
        df["OpenPrice"] = pd.to_numeric(df["OpenPrice"], errors="coerce")
        df["High"] = pd.to_numeric(df["High"], errors="coerce")
        df["Low"] = pd.to_numeric(df["Low"], errors="coerce")
        df["ClosePrice"] = pd.to_numeric(df["ClosePrice"], errors="coerce")
        df["Volume"] = pd.to_numeric(df["Volume"], errors="coerce", downcast="integer")
        df["Symbol"] = df["Symbol"].str.upper()  # Ensure symbols are uppercase

        # Sort by date for consistency
        df.sort_values(by=["Date"], ascending=True, inplace=True)

        # Calculate DailyReturn
        df["DailyReturn"] = df.groupby("Symbol")["ClosePrice"].pct_change()

        log_message("info", "Data transformed to standard format, including DailyReturn")
        return df
    except Exception as e:
        log_message("error", f"Error transforming data: {e}")
        return pd.DataFrame()

def save_to_directory(df, file_name):
    """
    Save a DataFrame to a file in the dataset directory.

    Args:
        df (pd.DataFrame): The DataFrame to save.
        file_name (str): The name of the file to save.
    """
    try:
        # Ensure the directory exists
        os.makedirs(DATASET_DIR, exist_ok=True)

        # Save file to the dataset directory
        file_path = os.path.join(DATASET_DIR, file_name)
        df.to_csv(file_path, index=False)

        log_message("info", f"File saved to {file_path}")
        print(f"File saved to {file_path}")
    except Exception as e:
        log_message("error", f"Error saving file: {e}")

def main():
    """
    Main function to load, transform, and save stock data.
    """
    log_message("info", "Starting data transformation process")
    
    # Load existing data
    raw_data = load_existing_data(INPUT_FILE_NAME)
    
    if raw_data is not None:
        # Transform data to standard format
        transformed_data = transform_data(raw_data)

        if not transformed_data.empty:
            # Save the transformed data
            save_to_directory(transformed_data, OUTPUT_FILE_NAME)
        else:
            log_message("error", "Transformed data is empty. No output file created.")
    else:
        log_message("error", "No raw data available for transformation.")

if __name__ == "__main__":
    main()


In [9]:
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine
import os

# Function to load CSV data into a PostgreSQL database
def load_csv_to_postgres(csv_file_path, table_name, engine, schema):
    """
    Load data from a CSV file into a PostgreSQL table.

    Args:
        csv_file_path (str): Path to the CSV file.
        table_name (str): Name of the PostgreSQL table.
        engine (sqlalchemy.engine.base.Engine): SQLAlchemy engine object.
        schema (str): Schema in the database.
    """
    try:
        # Read CSV into a Pandas DataFrame
        df = pd.read_csv(csv_file_path)
        # Load DataFrame into PostgreSQL table
        df.to_sql(table_name, con=engine, schema=schema, if_exists='replace', index=False)
        print(f"{len(df)} rows loaded to {schema}.{table_name} successfully.")
    except Exception as e:
        print(f"Error loading data to PostgreSQL: {e}")

def main():
    try:
        # Load environment variables
        load_dotenv()

        # Database credentials from environment variables
        user = os.getenv("P_user")
        password = os.getenv("P_password")
        host = os.getenv("P_host")
        port = os.getenv("P_port")
        dbname = os.getenv("P_database")

        # Ensure environment variables are loaded
        if not all([user, password, host, port, dbname]):
            raise ValueError("One or more environment variables are missing.")

        # Database URI and SQLAlchemy engine
        DB_URI = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
        engine = create_engine(DB_URI)

        # File paths
        csv_file_paths = [r'dataset/cleaned_data/transformed_data.csv']
        
        # Schema and table details
        STG_SCHEMA = "stg"
        table_name = "fact_daily_stock"

        # Load each CSV into PostgreSQL
        for file_path in csv_file_paths:
            load_csv_to_postgres(file_path, table_name, engine, STG_SCHEMA)

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()


Error loading data to PostgreSQL: [Errno 2] No such file or directory: 'dataset/cleaned_data/transformed_data.csv'
