In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.metrics import mean_squared_error, mean_absolute_error
import math
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM
from textblob import TextBlob
import tweepy
import logging
from statsmodels.tsa.arima.model import ARIMA
from pmdarima import auto_arima  # For auto ARIMA
from sklearn.model_selection import train_test_split

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to set up Twitter API authentication
def initialize_twitter_api():
    """Initializes the Twitter API using provided credentials."""
    try:
        consumer_key = "YOUR_CONSUMER_KEY"  # Replace with your keys
        consumer_secret = "YOUR_CONSUMER_SECRET"
        access_token = "YOUR_ACCESS_TOKEN"
        access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"

        auth = tweepy.OAuth1UserHandler(consumer_key, consumer_secret, access_token, access_token_secret)
        api = tweepy.API(auth)
        return api
    except Exception as e:
        logging.error(f"Error initializing Twitter API: {e}")
        return None

# Function to get average sentiment from tweets
def analyze_tweet_sentiment(api, ticker, num_tweets=100):
    """Analyzes the sentiment of recent tweets related to a given ticker."""
    try:
        if api is None:
            return 0
        query = f"${ticker} OR {ticker} -filter:retweets"  # Construct search query
        tweets = tweepy.Cursor(api.search_tweets, q=query, lang="en", tweet_mode="extended", since=(datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')).items(num_tweets)
        sentiments = []
        for tweet in tweets:
            try:
                text = tweet.full_text
                sentiment = TextBlob(text).sentiment.polarity  # Get sentiment polarity
                sentiments.append(sentiment)
            except Exception as e:
                logging.error(f"Error analyzing tweet sentiment: {e}")
                continue

        avg_sentiment = np.mean(sentiments) if sentiments else 0
        return avg_sentiment
    except Exception as e:
        logging.error(f"Error fetching or analyzing tweets: {e}")
        return 0

# Function to fetch historical stock data
def fetch_historical_stock_data(ticker):
    """Fetches historical stock data from yfinance."""
    try:
        end = datetime.now()
        start = datetime(end.year - 5, end.month, end.day)  # Get 5 years of data
        data = yf.download(ticker, start=start, end=end)
        if data.empty:
            raise ValueError(f"No data found for ticker: {ticker}")
        return data
    except Exception as e:
        logging.error(f"Error fetching stock data: {e}")
        return None

# Function to create and train ARIMA model
def train_arima_model(train_data):
    """Creates and trains an ARIMA model using auto_arima for parameter selection."""
    try:
        stepwise_fit = auto_arima(train_data, start_p=1, start_q=1,
                                  max_p=5, max_q=5, m=12,  # Adjust ranges if needed
                                  seasonal=False, trace=True,
                                  error_action='ignore',
                                  suppress_warnings=True,
                                  stepwise=True)
        print(stepwise_fit.summary())  # Print model summary
        return stepwise_fit
    except Exception as e:
        logging.error(f"Error training ARIMA model: {e}")
        return None

# Function to evaluate ARIMA model
def evaluate_arima_predictions(model, test_data):
    """Evaluates the ARIMA model on test data."""
    try:
        predictions = model.predict(n_periods=len(test_data))
        rmse = math.sqrt(mean_squared_error(test_data, predictions))
        mae = mean_absolute_error(test_data, predictions)
        return predictions, rmse, mae
    except Exception as e:
        logging.error(f"Error evaluating ARIMA model: {e}")
        return None, None, None

# Function to create and train LSTM model
def train_lstm_model(X_train, y_train):
    """Creates and trains an LSTM model."""
    try:
        model = Sequential()
        model.add(LSTM(units=64, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
        model.add(Dropout(0.3))
        model.add(LSTM(units=64, return_sequences=True))
        model.add(Dropout(0.3))
        model.add(LSTM(units=64))
        model.add(Dropout(0.3))
        model.add(Dense(units=1))
        model.compile(optimizer='adam', loss='mean_squared_error')
        model.fit(X_train, y_train, epochs=50, batch_size=32, verbose=0)  # Increased epochs
        return model
    except Exception as e:
        logging.error(f"Error training LSTM model: {e}")
        return None

# Function to evaluate LSTM model
def evaluate_lstm_predictions(model, X_test, y_test):
    """Evaluates the LSTM model on test data."""
    try:
        predictions = model.predict(X_test)
        rmse = math.sqrt(mean_squared_error(y_test, predictions))
        mae = mean_absolute_error(y_test, predictions)
        return predictions, rmse, mae
    except Exception as e:
        logging.error(f"Error evaluating LSTM model: {e}")
        return None, None, None

if __name__ == "__main__":
    quote = input("Enter stock ticker symbol: ")
    df = fetch_historical_stock_data(quote)
    if df is None:
        exit()
    df = df.dropna()

    twitter_api = initialize_twitter_api()
    sentiment = analyze_tweet_sentiment(twitter_api, quote)
    print(f"Average Tweet Sentiment: {sentiment}")

    df['Sentiment'] = sentiment
    df = df[['Close', 'Sentiment']]

    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(df)
    close_scaled = scaled_data[:, 0]

    train_size = int(len(scaled_data) * 0.8)
    train, test = close_scaled[:train_size], close_scaled[train_size:]

    arima_model = train_arima_model(train)
    if arima_model:
        arima_predictions, arima_rmse, arima_mae = evaluate_arima_predictions(arima_model, test)
        if arima_predictions is not None:
            print(f"ARIMA RMSE: {arima_rmse}, MAE: {arima_mae}")
            plt.figure(figsize=(10, 6))
            plt.plot(df['Close'].values[-len(test):], label='Actual Close Price')
            plt.plot(scaler.inverse_transform(np.array(arima_predictions).reshape(-1, 1)), label='ARIMA Predictions')
            plt.title('ARIMA Model')
            plt.legend()
            plt.show()

    # LSTM Model
    X, y = [], []
    look_back = 7
    for i in range(look_back, len(scaled_data)):
        X.append(scaled_data[i - look_back:i, :])
        y.append(scaled_data[i, 0])
    X, y = np.array(X), np.array(y)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # Shuffle set to false for time series

    lstm_model = train_lstm_model(X_train, y_train)
    if lstm_model:
        lstm_predictions, lstm_rmse, lstm_mae = evaluate_lstm_predictions(lstm_model, X_test, y_test)
        if lstm_predictions is not None:
            print(f"LSTM RMSE: {lstm_rmse}, MAE: {lstm_mae}")
            plt.figure(figsize=(10, 6))