In [89]:
import requests
import pandas as pd
import os
from dotenv import load_dotenv
from datetime import datetime
from alpha_vantage.timeseries import TimeSeries as TS

In [7]:
load_dotenv()
API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY")
BASE_URL = "https://www.alphavantage.co/query"
print(API_KEY)

1MYLPNITIDL3DGOT


In [8]:
TICKERS = ["AAPL", "MSFT", "NVDA", "TSLA", "AMZN"]
ts= TS(key=API_KEY, output_format='pandas')

In [9]:
def fetch_and_save_stock_data(symbol):
    try:
        data, meta_data = ts.get_daily(symbol=symbol, outputsize='full')
        data['ticker'] = symbol
        data.index = pd.to_datetime(data.index)
        data.sort_index(ascending=True, inplace=True)

        timestamp = datetime.now().strftime("%Y-%m-%d")
        
        ingest_filepath = f"/home/Stock_pipeline/data/raw/{timestamp}/"
        os.makedirs(ingest_filepath, exist_ok=True)
        
        filename = f"{ingest_filepath}{symbol}_daily_{timestamp}.csv"

        data.to_csv(filename)
        print(f" {symbol} data saved to {filename}.")
    except Exception as e:
        print(f" Error fetching {symbol}: {e}")

def main():
    for ticker in TICKERS:
        fetch_and_save_stock_data(ticker)

if __name__ == "__main__":
    main()

 AAPL data saved to /home/Stock_pipeline/data/raw/2025-06-04/AAPL_daily_2025-06-04.csv.
 MSFT data saved to /home/Stock_pipeline/data/raw/2025-06-04/MSFT_daily_2025-06-04.csv.
 NVDA data saved to /home/Stock_pipeline/data/raw/2025-06-04/NVDA_daily_2025-06-04.csv.
 TSLA data saved to /home/Stock_pipeline/data/raw/2025-06-04/TSLA_daily_2025-06-04.csv.
 AMZN data saved to /home/Stock_pipeline/data/raw/2025-06-04/AMZN_daily_2025-06-04.csv.


In [10]:
timestamp = datetime.now().strftime("%Y-%m-%d")
ingest_filepath = f"/home/Stock_pipeline/data/raw/{timestamp}/"

processed_filepath = f"/home/Stock_pipeline/data/processed/{timestamp}/"
os.makedirs(processed_filepath, exist_ok=True)

In [83]:
def process_stock_file(filepath):
    df = pd.read_csv(filepath, parse_dates=["date"])
    
    

    # Keep relevant columns only
    df.rename(columns={"4. close": "close", "5. volume": "volume"}, inplace=True)
    df = df[["date", "close", "volume", "ticker"]].copy()
    # df = df["date"] = pd.to_datetime(df_all["date"]).dt.date

    df["date"] = pd.to_datetime(df["date"])
    # Daily % change
    df["percentage_change"] = df["close"].pct_change() * 100

    # Moving Averages
    df["movingAverage_7"] = df["close"].rolling(window=7).mean()
    df["movingAverage_30"] = df["close"].rolling(window=30).mean()

    # Volatility (rolling std deviation)
    df["volatility"] = df["close"].rolling(window=30).std()
    
    df.dropna(inplace=True)  # Drop rows with NaN values after calculations
    df.drop_duplicates()  # Drop duplicate rows if any

    return df

def main():
    combined_df = []

    for filename in os.listdir(ingest_filepath):
        if filename.endswith(".csv"):
            filepath = os.path.join(ingest_filepath, filename)
            df_processed = process_stock_file(filepath)

            # Save back to processed CSV
            out_file = os.path.join(processed_filepath, filename.replace("_daily", "_processed"))
            df_processed.to_csv(out_file, index=False)
            print(f"Processed and saved: {out_file}")

            combined_df.append(df_processed)

    # Concatenate all processed files into one DataFrame
    final_df = pd.concat(combined_df, ignore_index=True)
    return final_df

def processed_df():
    return df_all
if __name__ == "__main__":
    main()
    df_all = main()
    

Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/AAPL_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/MSFT_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/AMZN_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/TSLA_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/NVDA_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/AAPL_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/MSFT_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/AMZN_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/TSLA_processed_2025-06-04.csv
Processed and saved: /home/Stock_pipeline/data/processed/2025-06-04/NVDA_processed_2025-06-04.csv


In [84]:
import os
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, Column, String, Float, Integer, Date
from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION, BIGINT
from sqlalchemy.exc import SQLAlchemyError
from process import main
from dotenv import load_dotenv

# Connect to PostgreSQL via SQLAlchemy
load_dotenv()
user = os.getenv("PGUSER")
password = os.getenv("PGPASSWORD")
host = os.getenv("PGHOST", "localhost")
port = os.getenv("PGPORT", "5432")
database = os.getenv("PGDATABASE")

pg_conn = f"postgresql://{user}:{password}@{host}:{port}/{database}"
engine = create_engine(pg_conn, echo=True)


metadata = MetaData(schema="public")

# Define your table schema explicitly
stock_prices = Table(
    'stock_prices', metadata,
    Column('date', Date, primary_key=True),
    Column('close', DOUBLE_PRECISION),
    Column('volume', BIGINT),
    Column('ticker', String, primary_key=True),
    Column('percentage_change', DOUBLE_PRECISION),
    Column('movingAverage_7', DOUBLE_PRECISION),
    Column('movingAverage_30', DOUBLE_PRECISION),
    Column('volatility', DOUBLE_PRECISION),
)

# Create the table in DB (if not exists)
stock_prices.drop(engine, checkfirst=True)  # Drops the table if it exists
metadata.create_all(engine)  # Recreates it with new schema


# Example function to load df into DB
def load_data(df, table, conn):
    try:
        # Insert data row by row (you can optimize with bulk_insert_mappings or df.to_sql)
        conn.execute(table.delete())  # Optional: clear existing data
        conn.execute(table.insert(), df.to_dict(orient='records'))
        print("Data loaded successfully!")
    except SQLAlchemyError as e:
        print(f"Error loading data: {e}")

# Load and process your CSV file (adjust as needed)
def main3():
    df_all = processed_df()
    df_all["date"] = pd.to_datetime(df_all["date"]).dt.date


    # Make sure df columns match your table columns exactly!
    # # Example: reorder and rename columns if needed
    # df = df[['date', 'close','volume','ticker', 'percentage_change', 'movingAverage_7', 'movingAverage_30', 'volatility']]
    
    with engine.begin() as conn:  # transaction scope
        load_data(df_all, stock_prices, conn)

if __name__ == "__main__":
    main3()


2025-06-04 08:16:09,838 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-06-04 08:16:09,848 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-06-04 08:16:09,855 INFO sqlalchemy.engine.Engine select current_schema()
2025-06-04 08:16:09,863 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-06-04 08:16:09,871 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-06-04 08:16:09,880 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-06-04 08:16:09,887 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-06-04 08:16:09,900 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=%(schema)s and relname=%(name)s
2025-06-04 08:16:09,904 INFO sqlalchemy.engine.Engine [generated in 0.00422s] {'schema': 'public', 'name': 'stock_prices'}
2025-06-04 08:16:09,914 INFO sqlalchemy.engine.Engine 
DROP TABLE public.stock_prices
2025-06-04 08:16:09,918 INFO sqlalchemy.engine.Engine [no key 0.00355s] {}
2025-06-04 08:16:09,

2025-06-04 08:16:10,043 INFO sqlalchemy.engine.Engine COMMIT
2025-06-04 08:16:10,160 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-06-04 08:16:10,165 INFO sqlalchemy.engine.Engine DELETE FROM public.stock_prices
2025-06-04 08:16:10,175 INFO sqlalchemy.engine.Engine [generated in 0.01068s] {}
2025-06-04 08:16:10,971 INFO sqlalchemy.engine.Engine INSERT INTO public.stock_prices (date, close, volume, ticker, percentage_change, "movingAverage_7", "movingAverage_30", volatility) VALUES (%(date)s, %(close)s, %(volume)s, %(ticker)s, %(percentage_change)s, %(movingAverage_7)s, %(movingAverage_30)s, %(volatility)s)
2025-06-04 08:16:10,973 INFO sqlalchemy.engine.Engine [generated in 0.34309s] ({'date': datetime.date(1999, 12, 13), 'close': 99.0, 'volume': 4731800.0, 'ticker': 'AAPL', 'percentage_change': -3.8834951456310662, 'movingAverage_7': 109.44571428571427, 'movingAverage_30': 95.65066666666667, 'volatility': 10.403151025740602}, {'date': datetime.date(1999, 12, 14), 'close': 94.87, 

In [86]:
duplicates = df_all[df_all.duplicated(subset=['date', 'close'], keep=False)].count()
print("Duplicates found:\n", duplicates)


Duplicates found:
 date                 8
close                8
volume               8
ticker               8
percentage_change    8
movingAverage_7      8
movingAverage_30     8
volatility           8
dtype: int64


In [1]:
from process import processed_df

df = processed_df()
print(df.head())


        date   close     volume ticker  percentage_change  movingAverage_7  \
0 1999-12-13   99.00  4731800.0   AAPL          -3.883495       109.445714   
1 1999-12-14   94.87  3891700.0   AAPL          -4.171717       106.570000   
2 1999-12-15   97.00  5562300.0   AAPL           2.245178       103.855714   
3 1999-12-16   98.31  4141300.0   AAPL           1.350515       101.070000   
4 1999-12-17  100.00  4419700.0   AAPL           1.719052        99.632857   

   movingAverage_30  volatility  
0         95.650667   10.403151  
1         96.225667    9.833312  
2         96.784000    9.359031  
3         97.344333    8.904592  
4         97.890333    8.528275  
