In [2]:
import os
import json
import yfinance as yf
import requests
import time
from datetime import datetime
from google.cloud import pubsub_v1, bigquery
from tensorflow.keras.models import load_model

In [3]:
# Set up Google Cloud credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"C:\\Stock_pred\\stockanalysis-444512-3fd7af0c6e0a.json"

In [4]:
# Load ML model
model = load_model('C:\\Stock_pred\\saved_model\\lstm_model.keras')

# Initialize BigQuery client
bigquery_client = bigquery.Client(project="stockanalysis-444512")


In [7]:
def prepare_features(stock_data):
    """Prepare features for ML model prediction."""
    feature_columns = ["Open", "High", "Low", "Close"]
    if len(stock_data) < 5:
        raise ValueError("Not enough data to make a prediction. Requires at least 5 time steps.")
    features = stock_data[feature_columns].tail(5).values
    return features.reshape((1, 5, 4))

def predict_trend(features):
    """Predict stock trend using ML model."""
    prediction = model.predict(features)
    return "up" if prediction[0] == 1 else "down"


In [21]:
# Pub/Sub setup
project_id = "stockanalysis-444512"
topic_id = "real-time-stock-data"
subscription_id = "real-time-stock-data-sub"
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Publish stock data to Pub/Sub
def publish_stock_data(tickers):
    for ticker in tickers:
        stock_data = yf.download(ticker, interval='5m', period='5d')
        if stock_data.empty:
            print(f"No data fetched for {ticker}.")
            continue
        latest_row = stock_data.iloc[-1]
        stock_message = {
            "symbol": ticker,
            "price": round(float(latest_row["Close"].iloc[0]), 2),
            "timestamp": latest_row.name.strftime('%Y-%m-%dT%H:%M:%SZ')
        }
        message = json.dumps(stock_message).encode("utf-8")
        future = publisher.publish(topic_path, message)
        print(f"Published message for {ticker} with ID: {future.result()}")

In [22]:
# Subscriber callback
def callback(message):
    try:
        raw_data = message.data.decode("utf-8")
        data = json.loads(raw_data)
        required_keys = {"symbol", "price", "timestamp"}
        if not required_keys.issubset(data.keys()):
            print(f"Missing keys in message: {data}")
            message.nack()
            return

        # Fetch recent stock data for feature preparation
        stock_data = yf.download(data["symbol"], interval='5m', period='5d')
        features = prepare_features(stock_data)
        predicted_trend = predict_trend(features)

        # Insert prediction into BigQuery
        table_ref = "stockanalysis-444512.stock_analysis.stream_mltobq"
        row_to_insert = {
            "symbol": data["symbol"],
            "price": data["price"],
            "timestamp": data["timestamp"],
            "predicted_trend": predicted_trend
        }
        errors = bigquery_client.insert_rows_json(table_ref, [row_to_insert])
        if errors:
            print(f"BigQuery errors: {errors}")
        else:
            print(f"Prediction for {data['symbol']} inserted into BigQuery.")

        message.ack()
    except Exception as e:
        print(f"Error processing message: {e}")
        message.nack()

# Start Pub/Sub subscriber
subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")

Listening for messages on projects/stockanalysis-444512/subscriptions/real-time-stock-data-sub...


[*********************100%***********************]  1 of 1 completed


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 701ms/step


[*********************100%***********************]  1 of 1 completed


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 71ms/step
Prediction for AAPL inserted into BigQuery.
Prediction for INTC inserted into BigQuery.


[*********************100%***********************]  1 of 1 completed

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step





Prediction for TSLA inserted into BigQuery.


[*********************100%***********************]  1 of 1 completed

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step





Prediction for AMZN inserted into BigQuery.


[*********************100%***********************]  1 of 1 completed

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step





Prediction for BABA inserted into BigQuery.


In [23]:
# Merge data into BigQuery
merge_query = """
MERGE `stockanalysis-444512.stock_analysis.stream_mltobq` T
USING (
    SELECT symbol, price, timestamp, predicted_trend FROM `stockanalysis-444512.stock_analysis.stream_pubsubtobq`
    WHERE NOT EXISTS (
        SELECT 1 FROM `stockanalysis-444512.stock_analysis.stream_mltobq` M
        WHERE T.symbol = M.symbol AND T.timestamp = M.timestamp
    )
) S
ON T.symbol = S.symbol AND T.timestamp = S.timestamp
WHEN NOT MATCHED THEN
INSERT (symbol, price, timestamp, predicted_trend)
VALUES (S.symbol, S.price, S.timestamp, S.predicted_trend)
"""

In [None]:
def run_merge_query():
    query_job = bigquery_client.query(merge_query)
    query_job.result()
    print("Data merged successfully.")

# Periodically fetch data, publish, and merge
if __name__ == "__main__":
    tickers = ["TSLA", "AAPL", "MSFT", "GOOGL", "AMZN", "ORCL", "INTC", "NVDA", "META", "BABA"]
    publish_stock_data(tickers)
    while True:
        time.sleep(60)

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


Published message for TSLA with ID: 13473778826096647
Published message for AAPL with ID: 13474321334745635


[*********************100%***********************]  1 of 1 completed


Published message for MSFT with ID: 13474044197589644


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


Published message for GOOGL with ID: 13474084363586647
Published message for AMZN with ID: 13620395647108117


[*********************100%***********************]  1 of 1 completed


Published message for ORCL with ID: 13474248550838686


[*********************100%***********************]  1 of 1 completed


Published message for INTC with ID: 13474196264470193


[*********************100%***********************]  1 of 1 completed

Published message for NVDA with ID: 13473695341808934



[*********************100%***********************]  1 of 1 completed


Published message for META with ID: 13474248035957527


[*********************100%***********************]  1 of 1 completed


Published message for BABA with ID: 13473760417664484
