In [16]:
import os
import sys
import numpy as np
import pandas as pd
import logging
import warnings
import asyncio
import aiohttp
from datetime import datetime, timedelta
import time
import configparser
from sklearn.preprocessing import StandardScaler
import yfinance as yf
from dotenv import load_dotenv
import threading
import smtplib
from email.mime.text import MIMEText
import subprocess
from sqlalchemy import create_engine

# Import Machine Learning Libraries
import tensorflow as tf
import xgboost as xgb
from ray import tune
from ray.tune.integration.xgboost import TuneReportCallback

# Import Reinforcement Learning Libraries
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import gym
from gym import spaces

# Import Data Visualization and Dashboard Libraries
import matplotlib.pyplot as plt
import streamlit as st

# Import API Libraries
import alpaca_trade_api as tradeapi
import nest_asyncio
nest_asyncio.apply()
from transformers import pipeline
from twilio.rest import Client  # For SMS alerts

# Suppress transformers library logs
logging.getLogger("transformers").setLevel(logging.ERROR)

# Suppress Warnings
warnings.filterwarnings('ignore')

# Set up Logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("trading_bot.log"),
        logging.StreamHandler()
    ]
)

# Load Configuration
config = configparser.ConfigParser()
config.read('config.ini')

# Load Environment Variables
load_dotenv('MarketCheck.env')

# Retrieve Environment Variables
ALPACA_API_KEY = os.getenv('ALPACA_API_KEY')
ALPACA_SECRET_KEY = os.getenv('ALPACA_SECRET_KEY')
ALPACA_BASE_URL = config.get('alpaca', 'base_url', fallback="https://paper-api.alpaca.markets")
FINNHUB_API_KEY = os.getenv('FINNHUB_API_KEY')
ALPHA_VANTAGE_API_KEY = os.getenv('ALPHA_VANTAGE_API_KEY')
TWITTER_API_BEARER_TOKEN = os.getenv('TWITTER_API_BEARER_TOKEN')
EMAIL_ADDRESS = os.getenv('EMAIL_ADDRESS')
EMAIL_PASSWORD = os.getenv('EMAIL_PASSWORD')
SMS_API_KEY = os.getenv('SMS_API_KEY')  # Twilio Account SID
SMS_AUTH_TOKEN = os.getenv('SMS_AUTH_TOKEN')  # Twilio Auth Token
SMS_FROM_NUMBER = os.getenv('SMS_FROM_NUMBER')  # Twilio Phone Number
SMS_TO_NUMBER = os.getenv('SMS_TO_NUMBER')  # Your Phone Number
SATELLITE_DATA_API_KEY = os.getenv('SATELLITE_DATA_API_KEY')
WEB_TRAFFIC_API_KEY = os.getenv('WEB_TRAFFIC_API_KEY')
CREDIT_CARD_DATA_API_KEY = os.getenv('CREDIT_CARD_DATA_API_KEY')
DATABASE_URL = os.getenv('DATABASE_URL')

def validate_environment_variables(keys=None):
    if keys is None:
        keys = [
            "ALPACA_API_KEY",
            "ALPACA_SECRET_KEY",
            "FINNHUB_API_KEY",
            "ALPHA_VANTAGE_API_KEY",
            "TWITTER_API_BEARER_TOKEN",
            "EMAIL_ADDRESS",
            "EMAIL_PASSWORD",
            "SMS_API_KEY",
            "SMS_AUTH_TOKEN",
            "SMS_FROM_NUMBER",
            "SMS_TO_NUMBER",
            "SATELLITE_DATA_API_KEY",
            "WEB_TRAFFIC_API_KEY",
            "CREDIT_CARD_DATA_API_KEY",
            "DATABASE_URL"
        ]
    missing = [key for key in keys if not os.getenv(key)]
    if missing:
        logging.error(f"Missing environment variables: {missing}")
        sys.exit(1)
    logging.info("All environment variables are loaded successfully.")

validate_environment_variables()

# Set up Alpaca API
api = tradeapi.REST(ALPACA_API_KEY, ALPACA_SECRET_KEY, ALPACA_BASE_URL)

class TradingEnvironment(gym.Env):
    """
    Custom OpenAI Gym environment for stock trading simulation.
    """
    metadata = {'render.modes': ['human']}

    def __init__(self, df):
        super(TradingEnvironment, self).__init__()

        self.df = df.reset_index(drop=True)
        # Exclude non-numeric columns from observation space
        self.feature_columns = df.select_dtypes(include=[np.number]).columns.tolist()
        self.action_space = spaces.Discrete(3)  # Actions: Hold, Buy, Sell
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(len(self.feature_columns),), dtype=np.float32
        )
        self.current_step = 0
        self.balance = 100000  # Starting balance
        self.shares_held = 0
        self.net_worth = self.balance
        self.max_net_worth = self.balance
        self.initial_net_worth = self.balance
        self.trades = []

    def reset(self):
        self.current_step = 0
        self.balance = 100000
        self.shares_held = 0
        self.net_worth = self.balance
        self.max_net_worth = self.balance
        self.initial_net_worth = self.balance
        self.trades = []
        return self._next_observation()

    def _next_observation(self):
        obs = self.df.loc[self.current_step, self.feature_columns].values.astype(np.float32)
        return obs

    def step(self, action):
        current_price = self.df.loc[self.current_step, 'Close']
        reward = 0

        # Execute action
        if action == 0:  # Hold
            pass
        elif action == 1:  # Buy
            if self.balance >= current_price:
                self.shares_held += 1
                self.balance -= current_price
                self.trades.append({'step': self.current_step, 'shares': 1, 'type': 'buy'})
        elif action == 2:  # Sell
            if self.shares_held > 0:
                self.shares_held -= 1
                self.balance += current_price
                self.trades.append({'step': self.current_step, 'shares': 1, 'type': 'sell'})

        # Update net worth
        prev_net_worth = self.net_worth
        self.net_worth = self.balance + self.shares_held * current_price
        self.max_net_worth = max(self.max_net_worth, self.net_worth)

        # Calculate reward as the change in net worth
        reward = self.net_worth - prev_net_worth

        self.current_step += 1
        done = self.current_step >= len(self.df) - 1

        obs = self._next_observation()
        info = {}

        return obs, reward, done, info

    def render(self, mode='human', close=False):
        profit = self.net_worth - self.initial_net_worth
        print(f'Step: {self.current_step}')
        print(f'Balance: {self.balance}')
        print(f'Shares held: {self.shares_held}')
        print(f'Net worth: {self.net_worth}')
        print(f'Profit: {profit}')

class TradingBot:
    def __init__(self):
        # Load settings from configuration
        self.ticker = config.get('trading', 'ticker', fallback='AAPL')
        self.start_date = config.get('trading', 'start_date', fallback='2020-01-01')
        self.end_date = datetime.today().strftime("%Y-%m-%d")
        self.interval = config.getint('trading', 'interval', fallback=3600)
        self.initial_cash = config.getfloat('portfolio', 'initial_cash', fallback=100000.0)
        self.portfolio = {
            "cash": self.initial_cash,
            "holdings": 0,
            "equity": self.initial_cash
        }
        self.model = None
        self.rl_model = None
        self.scaler = None

        # Initialize sentiment analysis pipeline with FinBERT
        self.sentiment_model = pipeline(
            "sentiment-analysis",
            model="ProsusAI/finbert",
            tokenizer="ProsusAI/finbert"
        )

        # Initialize UI components
        self.dashboard_thread = threading.Thread(target=self.run_dashboard)
        self.dashboard_thread.daemon = True
        self.dashboard_thread.start()

        # Initialize database connection
        self.engine = create_engine(DATABASE_URL)

    def get_market_data(self):
        logging.info(f'Fetching market data for {self.ticker}')
        try:
            # Option to use Alpha Vantage API
            data = self.fetch_alpha_vantage_data()
            if data.empty:
                # Fallback to yfinance if Alpha Vantage fails
                data = yf.download(
                    self.ticker, start=self.start_date, end=self.end_date, interval='1d', progress=False
                )
            data.reset_index(inplace=True)
            data.rename(columns={'Date': 'timestamp'}, inplace=True)
            # Fetch alternative data sources and merge
            alternative_data = self.get_alternative_data()
            if not alternative_data.empty:
                data = pd.merge(data, alternative_data, on='timestamp', how='left')
            # Store data to the database
            data.to_sql('market_data', con=self.engine, if_exists='replace', index=False)
            return data
        except Exception as e:
            logging.error(f"Error fetching market data: {e}")
            return pd.DataFrame()

    def fetch_alpha_vantage_data(self):
        logging.info("Fetching data from Alpha Vantage API...")
        try:
            url = (
                f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED'
                f'&symbol={self.ticker}&outputsize=full&apikey={ALPHA_VANTAGE_API_KEY}&datatype=csv'
            )
            data = pd.read_csv(url)
            data.rename(columns={'timestamp': 'Date'}, inplace=True)
            data['Date'] = pd.to_datetime(data['timestamp'])
            data.sort_values('Date', inplace=True)
            return data
        except Exception as e:
            logging.warning(f"Alpha Vantage API failed: {e}")
            return pd.DataFrame()

    def get_alternative_data(self):
        logging.info("Fetching alternative data sources...")
        try:
            # Fetch satellite data
            satellite_data = self.fetch_satellite_data()
            # Fetch web traffic data
            web_traffic_data = self.fetch_web_traffic_data()
            # Fetch credit card transaction data
            credit_card_data = self.fetch_credit_card_data()
            # Merge all alternative data
            alternative_data = pd.merge(satellite_data, web_traffic_data, on='timestamp', how='outer')
            alternative_data = pd.merge(alternative_data, credit_card_data, on='timestamp', how='outer')
            return alternative_data
        except Exception as e:
            logging.error(f"Error fetching alternative data: {e}")
            return pd.DataFrame()

    def fetch_satellite_data(self):
        logging.info("Fetching satellite data...")
        try:
            # Placeholder for actual satellite data fetching using NASA Earth API
            date_range = pd.date_range(start=self.start_date, end=self.end_date)
            satellite_data = pd.DataFrame({
                'timestamp': date_range,
                'satellite_activity': np.random.rand(len(date_range))
            })
            return satellite_data
        except Exception as e:
            logging.error(f"Error fetching satellite data: {e}")
            return pd.DataFrame()

    def fetch_web_traffic_data(self):
        logging.info("Fetching web traffic data...")
        try:
            # Placeholder for actual web traffic data fetching using OpenPageRank API
            date_range = pd.date_range(start=self.start_date, end=self.end_date)
            web_traffic_data = pd.DataFrame({
                'timestamp': date_range,
                'web_traffic': np.random.randint(1000, 10000, size=len(date_range))
            })
            return web_traffic_data
        except Exception as e:
            logging.error(f"Error fetching web traffic data: {e}")
            return pd.DataFrame()

    def fetch_credit_card_data(self):
        logging.info("Fetching credit card transaction data...")
        try:
            # Placeholder for actual credit card data fetching
            date_range = pd.date_range(start=self.start_date, end=self.end_date)
            credit_card_data = pd.DataFrame({
                'timestamp': date_range,
                'credit_card_spend': np.random.randint(10000, 100000, size=len(date_range))
            })
            return credit_card_data
        except Exception as e:
            logging.error(f"Error fetching credit card data: {e}")
            return pd.DataFrame()

    async def get_news_headlines(self):
        logging.info(f"Fetching company-specific news for {self.ticker}")
        try:
            # Asynchronously fetch news from multiple sources
            news_data = await asyncio.gather(
                self.fetch_finnhub_news(),
                self.fetch_twitter_sentiment()
            )
            # Combine data
            news_df = pd.concat(news_data, ignore_index=True)
            return news_df
        except Exception as e:
            logging.error(f"Error fetching news headlines: {e}")
            return pd.DataFrame()

    async def fetch_finnhub_news(self):
        logging.info(f"Fetching news from Finnhub for {self.ticker}")
        today = datetime.today().strftime('%Y-%m-%d')
        one_month_ago = (datetime.today() - timedelta(days=30)).strftime('%Y-%m-%d')
        url = (
            f"https://finnhub.io/api/v1/company-news?symbol={self.ticker}"
            f"&from={one_month_ago}&to={today}&token={FINNHUB_API_KEY}"
        )
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                if response.status == 200:
                    articles = await response.json()
                    headlines = [
                        {
                            'date': pd.to_datetime(article.get('datetime'), unit='s').date(),
                            'headline': article.get('headline'),
                            'description': article.get('summary')
                        }
                        for article in articles
                    ]
                    return pd.DataFrame(headlines)
                else:
                    logging.warning(f"Failed to fetch Finnhub news: {response.status}")
                    return pd.DataFrame(columns=['date', 'headline', 'description'])

    async def fetch_twitter_sentiment(self):
        logging.info(f"Fetching Twitter sentiment for {self.ticker}")
        headers = {
            "Authorization": f"Bearer {TWITTER_API_BEARER_TOKEN}"
        }
        query = f"${self.ticker} -is:retweet lang:en"
        url = f"https://api.twitter.com/2/tweets/search/recent?query={query}&tweet.fields=created_at"
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers) as response:
                if response.status == 200:
                    tweets = await response.json()
                    tweet_data = [
                        {
                            'date': pd.to_datetime(tweet['created_at']).date(),
                            'headline': tweet['text']
                        }
                        for tweet in tweets.get('data', [])
                    ]
                    return pd.DataFrame(tweet_data)
                else:
                    logging.warning(f"Failed to fetch Twitter data: {response.status}")
                    return pd.DataFrame(columns=['date', 'headline'])

    def analyze_sentiment(self, headlines_df):
        logging.info('Analyzing sentiment...')
        if headlines_df.empty:
            logging.warning("No headlines available for sentiment analysis.")
            return pd.DataFrame(columns=['date', 'sentiment'])
        try:
            # Use the FinBERT model for financial sentiment analysis
            sentiments = self.sentiment_model(
                headlines_df['headline'].tolist(), truncation=True, max_length=512
            )
            headlines_df['sentiment_score'] = [
                1 if s['label'].lower() == 'positive' else (-1 if s['label'].lower() == 'negative' else 0)
                for s in sentiments
            ]
            headlines_df['date'] = pd.to_datetime(headlines_df['date'])
            return headlines_df.groupby('date')['sentiment_score'].mean().reset_index().rename(
                columns={'sentiment_score': 'sentiment'}
            )
        except Exception as e:
            logging.error(f"Error during sentiment analysis: {e}")
            return pd.DataFrame(columns=['date', 'sentiment'])

    def preprocess_data(self, market_data, sentiment_data):
        logging.info("Preprocessing data...")
        try:
            # Feature Engineering
            data = self.feature_engineering(market_data)
            # Merge with sentiment data
            data = self.merge_sentiment_data(data, sentiment_data)
            # Data Scaling
            data = self.scale_features(data)
            return data
        except Exception as e:
            logging.error(f"Error in preprocessing data: {e}")
            return pd.DataFrame()

    def feature_engineering(self, data):
        # Advanced technical indicators
        data['EMA_12'] = data['Close'].ewm(span=12, adjust=False).mean()
        data['EMA_26'] = data['Close'].ewm(span=26, adjust=False).mean()
        data['MACD'] = data['EMA_12'] - data['EMA_26']
        data['RSI'] = self.calculate_rsi(data['Close'])
        data['BB_upper'], data['BB_middle'], data['BB_lower'] = self.calculate_bollinger_bands(data['Close'])
        # Additional indicators can be added here
        return data

    def calculate_rsi(self, series, period=14):
        delta = series.diff(1)
        gain = delta.clip(lower=0)
        loss = -delta.clip(upper=0)
        avg_gain = gain.ewm(alpha=1/period, adjust=False).mean()
        avg_loss = loss.ewm(alpha=1/period, adjust=False).mean()
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
        return rsi.fillna(0)

    def calculate_bollinger_bands(self, series, period=20, num_std=2):
        rolling_mean = series.rolling(window=period).mean()
        rolling_std = series.rolling(window=period).std()
        upper_band = rolling_mean + (rolling_std * num_std)
        lower_band = rolling_mean - (rolling_std * num_std)
        return upper_band.fillna(0), rolling_mean.fillna(0), lower_band.fillna(0)

    def merge_sentiment_data(self, data, sentiment_data):
        if not sentiment_data.empty:
            sentiment_data['date'] = pd.to_datetime(sentiment_data['date'])
            data['date'] = pd.to_datetime(data['timestamp']).dt.date
            data = pd.merge(
                data, sentiment_data, left_on='date', right_on='date', how='left'
            )
            data['sentiment'].fillna(0, inplace=True)
        else:
            data['sentiment'] = 0
        return data

    def scale_features(self, data):
        features = data.select_dtypes(include=[np.number]).columns.tolist()
        features = [f for f in features if f not in ['Close', 'Adj Close']]
        self.scaler = StandardScaler()
        data[features] = self.scaler.fit_transform(data[features])
        return data

    def train_model(self, data):
        logging.info("Training predictive model...")
        try:
            # Prepare data for training
            X = data.drop(columns=['timestamp', 'date', 'Close', 'Adj Close'], errors='ignore')
            y = data['Close']
            # Use Ray Tune for hyperparameter tuning
            self.model = self.train_with_ray_tune(X, y)
        except Exception as e:
            logging.error(f"Error in training model: {e}")

    def train_with_ray_tune(self, X, y):
        def train_model(config):
            train_set = xgb.DMatrix(X, label=y)
            result = {}
            xgb.train(
                config,
                train_set,
                evals=[(train_set, "train")],
                verbose_eval=False,
                evals_result=result
            )
            tune.report(loss=result["train"]["rmse"][-1])

        search_space = {
            "objective": "reg:squarederror",
            "eval_metric": "rmse",
            "max_depth": tune.randint(3, 10),
            "learning_rate": tune.loguniform(1e-4, 1e-1),
            "min_child_weight": tune.randint(1, 10),
            "subsample": tune.uniform(0.5, 1.0),
            "colsample_bytree": tune.uniform(0.5, 1.0),
        }

        analysis = tune.run(
            train_model,
            config=search_space,
            num_samples=50,
            resources_per_trial={"cpu": 2},
            metric="loss",
            mode="min",
            verbose=1
        )

        best_config = analysis.get_best_config(metric="loss", mode="min")
        logging.info(f"Best hyperparameters found: {best_config}")
        # Train final model with best hyperparameters
        final_model = xgb.XGBRegressor(**best_config)
        final_model.fit(X, y)
        return final_model

    def train_reinforcement_learning_agent(self, data):
        logging.info("Training reinforcement learning agent...")
        try:
            env = TradingEnvironment(data)
            env = DummyVecEnv([lambda: env])

            model = PPO('MlpPolicy', env, verbose=0)
            model.learn(total_timesteps=10000)
            self.rl_model = model
        except Exception as e:
            logging.error(f"Error in training RL agent: {e}")

    def decide_trading_action(self, latest_row, data):
        logging.info("Evaluating trading action...")
        try:
            # Machine Learning Model Prediction
            X_latest = latest_row.drop(labels=['timestamp', 'date', 'Close', 'Adj Close'], errors='ignore').values.reshape(1, -1)
            X_scaled = self.scaler.transform(X_latest)
            predicted_price = self.model.predict(X_scaled)[0]
            current_price = latest_row['Close']
            logging.info(f"Predicted price: {predicted_price}, Current price: {current_price}")

            # Reinforcement Learning Agent Decision
            obs = X_scaled.flatten()
            action_rl, _ = self.rl_model.predict(obs, deterministic=True)
            logging.info(f"RL Agent Action: {action_rl}")

            # Combine Decisions
            expected_return = (predicted_price - current_price) / current_price

            # Risk-adjusted position sizing
            close_prices = data['Close']
            volatility = close_prices[-20:].std()  # Last 20 days
            if np.isnan(volatility) or volatility == 0:
                volatility = close_prices.std()
            position_size = self.calculate_position_size(expected_return, volatility)

            # Determine final action
            if action_rl == 1 and expected_return > 0.005:
                return "BUY", position_size
            elif action_rl == 2 and expected_return < -0.005:
                return "SELL", position_size
            else:
                return "HOLD", 0
        except Exception as e:
            logging.error(f"Error in deciding trading action: {e}")
            return "HOLD", 0

    def calculate_position_size(self, expected_return, volatility):
        risk_per_trade = 0.01  # 1% of equity
        account_equity = self.portfolio['equity']
        if volatility == 0 or np.isnan(volatility):
            volatility = 1  # Prevent division by zero
        position_size = (account_equity * risk_per_trade) / volatility
        return min(position_size, account_equity * 0.2)  # Max 20% of equity

    def execute_trade(self, action, amount, latest_row):
        try:
            if action == "BUY":
                buying_power = float(api.get_account().buying_power)
                quantity = int(amount // latest_row['Close'])
                if quantity > 0 and quantity * latest_row['Close'] <= buying_power:
                    api.submit_order(
                        symbol=self.ticker, qty=quantity, side="buy",
                        type="market", time_in_force="gtc"
                    )
                    self.portfolio['cash'] -= quantity * latest_row['Close']
                    self.portfolio['holdings'] += quantity
                    logging.info(f"Bought {quantity} shares of {self.ticker}")
                    self.send_alert(f"Bought {quantity} shares of {self.ticker}")
                else:
                    logging.info("Insufficient funds to buy.")
            elif action == "SELL":
                try:
                    position = api.get_position(self.ticker)
                    quantity = int(position.qty)
                    if quantity > 0:
                        api.submit_order(
                            symbol=self.ticker, qty=quantity, side="sell",
                            type="market", time_in_force="gtc"
                        )
                        self.portfolio['cash'] += quantity * latest_row['Close']
                        self.portfolio['holdings'] -= quantity
                        logging.info(f"Sold {quantity} shares of {self.ticker}")
                        self.send_alert(f"Sold {quantity} shares of {self.ticker}")
                    else:
                        logging.info("No holdings to sell.")
                except Exception:
                    logging.info("No position to sell.")
            else:
                logging.info("No action taken.")
        except Exception as e:
            logging.error(f"Error executing trade: {e}")

    def update_portfolio(self):
        try:
            account = api.get_account()
            self.portfolio['cash'] = float(account.cash)
            self.portfolio['equity'] = float(account.equity)
            try:
                position = api.get_position(self.ticker)
                self.portfolio['holdings'] = int(position.qty)
            except Exception:
                self.portfolio['holdings'] = 0
        except Exception as e:
            logging.error(f"Error updating portfolio: {e}")

    def send_alert(self, message):
        # Send email alert
        try:
            msg = MIMEText(message)
            msg['Subject'] = 'Trading Bot Alert'
            msg['From'] = EMAIL_ADDRESS
            msg['To'] = EMAIL_ADDRESS  # Send to self; can be adjusted

            with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
                server.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
                server.sendmail(EMAIL_ADDRESS, EMAIL_ADDRESS, msg.as_string())
            logging.info("Alert email sent.")
        except Exception as e:
            logging.error(f"Error sending alert email: {e}")

        # Send SMS alert using Twilio
        try:
            client = Client(SMS_API_KEY, SMS_AUTH_TOKEN)
            message = client.messages.create(
                body=message,
                from_=SMS_FROM_NUMBER,
                to=SMS_TO_NUMBER
            )
            logging.info("SMS alert sent.")
        except Exception as e:
            logging.error(f"Error sending SMS alert: {e}")

    def run_dashboard(self):
        # Streamlit code to create dashboard
        st.title('Trading Bot Dashboard')
        placeholder = st.empty()
        while True:
            with placeholder.container():
                st.write('Portfolio State')
                st.write(self.portfolio)
                # Additional components can be added here
                time.sleep(5)

    async def run(self):
        while True:
            try:
                # Fetch data
                market_data = self.get_market_data()
                if market_data.empty:
                    logging.warning("Market data is empty. Skipping iteration.")
                    await asyncio.sleep(self.interval)
                    continue
                headlines = await self.get_news_headlines()
                sentiment = self.analyze_sentiment(headlines)
                processed_data = self.preprocess_data(market_data, sentiment)
                if processed_data.empty:
                    logging.warning("Processed data is empty. Skipping iteration.")
                    await asyncio.sleep(self.interval)
                    continue
                self.train_model(processed_data)
                self.train_reinforcement_learning_agent(processed_data)
                latest_row = processed_data.iloc[-1]

                # Update portfolio
                self.update_portfolio()

                # Make trading decision
                action, amount = self.decide_trading_action(latest_row, processed_data)
                self.execute_trade(action, amount, latest_row)
                logging.info(f"Updated portfolio state: {self.portfolio}")

                # Sleep until next interval
                logging.info(f"Sleeping for {self.interval} seconds...")
                await asyncio.sleep(self.interval)
            except Exception as e:
                logging.error(f"An error occurred in the main loop: {e}")
                await asyncio.sleep(self.interval)

if __name__ == "__main__":
    bot = TradingBot()
    asyncio.run(bot.run())

ModuleNotFoundError: No module named 'ray'