In [7]:
# ─────────────────────────────────────────────
# 📦 STEP 1: Install required libraries
# ─────────────────────────────────────────────
!pip install yfinance pymongo requests schedule

# ─────────────────────────────────────────────
# 📁 STEP 2: Mount Google Drive to access CSV
# ─────────────────────────────────────────────
from google.colab import drive
drive.mount('/content/drive')

# ─────────────────────────────────────────────
# 📚 STEP 3: Import libraries
# ─────────────────────────────────────────────
import pandas as pd
import yfinance as yf
import requests
from pymongo import MongoClient
from datetime import datetime
import schedule
import time

# ─────────────────────────────────────────────
# 🔑 STEP 4: Your API keys
# ─────────────────────────────────────────────
NEWS_API_KEY = '23b1888348ed43ef8886a98edb9b29bf'
FINNHUB_API_KEY = 'cvofa09r01qppf5coiagcvofa09r01qppf5coib0'

# ─────────────────────────────────────────────
# 📄 STEP 5: Load your CSV file from Google Drive
# ─────────────────────────────────────────────
csv_path = '/content/drive/MyDrive/Colab Notebooks/AbdulSami(DS-055)/historical_data_large.csv'
df_csv = pd.read_csv(csv_path)

# ─────────────────────────────────────────────
# 🔍 STEP 6: Data Enrichment Functions
# ─────────────────────────────────────────────

def get_yfinance_data(symbol):
    try:
        stock = yf.Ticker(symbol)
        hist = stock.history(period="1d")
        latest = hist.tail(1)
        if not latest.empty:
            return {
                'latest_price': latest['Close'].values[0],
                'volume': int(latest['Volume'].values[0])
            }
    except:
        return {}
    return {}

def get_stock_news(symbol):
    try:
        url = f"https://newsapi.org/v2/everything?q={symbol}&apiKey={NEWS_API_KEY}"
        res = requests.get(url)
        articles = res.json().get("articles", [])
        return articles[:3] if articles else []
    except:
        return []

def get_sentiment(symbol):
    try:
        url = f"https://finnhub.io/api/v1/news-sentiment?symbol={symbol}&token={FINNHUB_API_KEY}"
        res = requests.get(url)
        if res.status_code == 200:
            data = res.json()
            return {
                'sentiment_score': data.get('sentiment_score', 0),
                'positive': data.get('positive'),
                'negative': data.get('negative'),
                'neutral': data.get('neutral'),
            }
    except:
        return {}
    return {}

def get_usd_to_inr():
    try:
        url = "https://api.exchangerate.host/latest?base=USD&symbols=INR"
        res = requests.get(url)
        if res.status_code == 200:
            return res.json()['rates']['INR']
    except:
        return None

# ─────────────────────────────────────────────
# 🛠️ STEP 7: ETL Job Function (limit to 100 rows)
# ─────────────────────────────────────────────
def etl_job():
    print("🛠️ ETL Job started...")
    df_sample = df_csv.head(100)  # Limit to 100 rows for testing

    enriched_data = []
    usd_to_inr = get_usd_to_inr()

    for index, row in df_sample.iterrows():
        symbol = row['Symbol']
        live = get_yfinance_data(symbol)
        news = get_stock_news(symbol)
        sentiment = get_sentiment(symbol)

        impact_score = 0
        if live.get('latest_price') and row.get('Open'):
            try:
                impact_score = abs(live['latest_price'] - row['Open']) / row['Open']
            except ZeroDivisionError:
                impact_score = 0

        enriched_data.append({
            'symbol': symbol,
            'date': row.get('Date', str(datetime.utcnow())),
            'open': row.get('Open'),
            'close': row.get('Close'),
            'volume_csv': row.get('Volume'),
            'latest_price': live.get('latest_price'),
            'live_volume': live.get('volume'),
            'usd_to_inr': usd_to_inr,
            'sentiment': sentiment,
            'news': news,
            'impact_score': impact_score,
            'timestamp': datetime.utcnow().isoformat()
        })

    df_final = pd.DataFrame(enriched_data)

    # Load to MongoDB
    MONGO_URI = "mongodb+srv://chanarpg4201474:!Ek4MEA#8vpHd45@cluster0.nrqxs.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
    client = MongoClient(MONGO_URI)
    db = client['stock_market_etl']
    collection = db['enriched_stocks']

    collection.insert_many(df_final.to_dict("records"))
    print("✅ Data loaded to MongoDB successfully!")

# ─────────────────────────────────────────────
# 🧪 STEP 8: Run ETL Immediately (for testing)
# ─────────────────────────────────────────────
etl_job()

# ─────────────────────────────────────────────
# ⏱️ STEP 9: Optional Scheduler (3-min loop for demo)
# ─────────────────────────────────────────────
schedule.every().day.at("02:35").do(etl_job)

start = time.time()
while time.time() - start < 180:
    schedule.run_pending()
    time.sleep(10)

print("⏹️ Schedule testing finished.")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
🛠️ ETL Job started...
✅ Data loaded to MongoDB successfully!
⏹️ Schedule testing finished.
