### Import the necessary packages

In [1]:
import os
import time
import dask.dataframe as dd
import dask.array as da
import numpy as np
from dask_ml.preprocessing import MinMaxScaler
from dask.distributed import performance_report
from dask.distributed import Client, LocalCluster
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from IPython.display import display, HTML

## Load and Preprocess Data

In [2]:
local_configs = {
    '1cpu': {'cores': 1, 'memory': '32GB', 'workers': 1},
    '4cpu': {'cores': 2, 'memory': '20GB', 'workers': 2},
    '8cpu': {'cores': 4, 'memory': '20GB', 'workers': 2},
    '12cpu': {'cores': 4, 'memory': '20GB', 'workers': 3},
    '16cpu': {'cores': 4, 'memory': '20GB', 'workers': 4},
    '20cpu': {'cores': 4, 'memory': '20GB', 'workers': 5}
} # config to run multiple clusters
results = {}

In [3]:
def load_and_preprocess_data():
    # loading files
    HOME_DIR = os.path.expanduser("~")
    PROJECT_DIR = os.path.join(HOME_DIR, "project/data/")
    files = os.path.join(PROJECT_DIR, "*.csv")

    # using dask for loading data
    df = dd.read_csv(files, include_path_column='filepath', blocksize='64MB')
    # adding the ticker name in their respective rows 
    df['Ticker'] = df['filepath'].map_partitions(
        lambda x: x.str.extract(r'([A-Z]+)\.csv$')[0]
    )

    # stocks prepared for model to train and predict
    stocks = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'GOOG', 'FB', 'TSLA', 'BRK-B', 'JPM', 'V']
    df = df[df['Ticker'].isin(stocks)]
    df = df.repartition(npartitions=200)
    
    features = ['Open', 'High', 'Low', 'Close', 'Volume']
    target = 'Adj Close'

    df = df.persist()
    df = df[features + [target]]

    print(df.shape)

    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(df.to_dask_array().compute())

    print(scaled_data)

    sequence_length = 10
    X, y = [], []
    for i in range(len(scaled_data) - sequence_length):
        X.append(scaled_data[i:i+sequence_length, :-1])
        y.append(scaled_data[i+sequence_length, -1])
    
    X, y = np.array(X), np.array(y)
    
    # splitting 80% of data
    train_size = int(0.8 * len(X))
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    
    return X_train, y_train, X_test, y_test

### Model Definition

In [4]:
import torch
from torch import nn

class BaseModel(nn.Module):
    """
    A base model class that implements different neural network architectures.

    This class supports four types of models:
    1. LSTM (Long Short-Term Memory)
    2. RNN (Recurrent Neural Network)
    3. GRU (Gated Recurrent Unit)
    4. CNN (Convolutional Neural Network)

    The class dynamically creates the appropriate model based on the 'model_type' parameter.
    For LSTM, RNN, and GRU, it uses a single layer followed by a fully connected layer.
    For CNN, it uses a 1D convolution layer, followed by adaptive average pooling and a fully connected layer.

    Args:
        input_dim (int): The input dimension of the data.
        model_type (str): The type of model to create ("LSTM", "RNN", "GRU", or "CNN").

    Attributes:
        model_type (str): The type of model being used.
        hidden_dim (int): The number of features in the hidden state (for LSTM, RNN, GRU).
        model (nn.Module): The main model architecture (LSTM, RNN, or GRU).
        conv (nn.Conv1d): The convolutional layer for CNN.
        pool (nn.AdaptiveAvgPool1d): The pooling layer for CNN.
        fc (nn.Linear): The fully connected layer for final output.

    Raises:
        ValueError: If an invalid model_type is provided.
    """
    def __init__(self, input_dim, model_type):
        super(BaseModel, self).__init__()
        self.model_type = model_type
        self.hidden_dim = 50
        
        if model_type == "LSTM":
            self.model = nn.LSTM(input_dim, self.hidden_dim, batch_first=True)
            self.fc = nn.Linear(self.hidden_dim, 1)
        elif model_type == "RNN":
            self.model = nn.RNN(input_dim, self.hidden_dim, batch_first=True)
            self.fc = nn.Linear(self.hidden_dim, 1)
        elif model_type == "GRU":
            self.model = nn.GRU(input_dim, self.hidden_dim, batch_first=True)
            self.fc = nn.Linear(self.hidden_dim, 1)
        elif model_type == "CNN":
            self.conv = nn.Conv1d(in_channels=input_dim, out_channels=32, kernel_size=3, stride=1, padding=1)
            self.pool = nn.AdaptiveAvgPool1d(output_size=5)  # Adjust output size as needed
            self.fc = nn.Linear(32 * 5, 1)  # 32 channels * 5 (pooling output size)
        else:
            raise ValueError("Invalid model type")

    def forward(self, x):
        if self.model_type in ["LSTM", "RNN", "GRU"]:
            out, _ = self.model(x)
            out = self.fc(out[:, -1, :])  # Last timestep
        elif self.model_type == "CNN":
            x = x.transpose(1, 2)  # Swap dimensions for Conv1d
            x = self.conv(x)
            x = self.pool(x)
            x = x.view(x.size(0), -1)  # Flatten for fully connected layer
            out = self.fc(x)
        return out

In [5]:
def train_model_chunk(model, X_chunk, y_chunk, epochs=5):
    """
    Train a model on a chunk of data using mini-batch gradient descent.

    This function is designed for incremental learning, where the model is trained
    on smaller chunks of data to handle large datasets or streaming data scenarios.

    Args:
        model (nn.Module): The neural network model to be trained.
        X_chunk (numpy.ndarray): Input features for the current data chunk.
        y_chunk (numpy.ndarray): Target values for the current data chunk.
        epochs (int, optional): Number of training epochs for this chunk. Defaults to 5.

    Returns:
        dict: The updated state dictionary of the model after training on this chunk.
    """
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    chunk_dataset = TensorDataset(torch.tensor(X_chunk, dtype=torch.float32),
                                  torch.tensor(y_chunk, dtype=torch.float32))
    chunk_loader = DataLoader(chunk_dataset, batch_size=32, shuffle=True)
    
    for epoch in range(epochs):
        model.train()
        for X_batch, y_batch in chunk_loader:
            optimizer.zero_grad()
            y_pred = model(X_batch).squeeze()
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
    
    return model.state_dict()

In [6]:
def update_model(model, state_dict):
    # Load the new state dictionary into the model
    model.load_state_dict(state_dict)
    return model

def train_model_parallel(client, model_type, X_train, y_train, n_chunks=6):
    # Initialize the model
    model = BaseModel(input_dim=X_train.shape[2], model_type=model_type)
    chunk_size = len(X_train) // n_chunks
    
    for i in range(0, len(X_train), chunk_size):
        # Split data into chunks
        X_chunk = X_train[i:i+chunk_size]
        y_chunk = y_train[i:i+chunk_size]
        
        # Submit training job to Dask client
        future = client.submit(train_model_chunk, model, X_chunk, y_chunk)
        # Wait for the result and get the updated state dict
        state_dict = future.result()
        # Update the model with new weights
        model = update_model(model, state_dict)
    
    return model

In [7]:
def evaluate_model(model, test_loader):
    # function to evaluate the model
    model.eval()
    predictions = []
    actuals = []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = model(X_batch).squeeze()
            predictions.append(y_pred.numpy())
            actuals.append(y_batch.numpy())
    return (np.mean((np.concatenate(predictions) - np.concatenate(actuals))**2), predictions, actuals)

### Function to Experiment on different Cluster on multiple CPUs

In [8]:
def run_experiment(config_name, config):
    """
    Run a full experiment with multiple models using Dask for parallel processing.

    Args:
        config_name (str): Name of the configuration being used.
        config (dict): Configuration parameters for the experiment.

    Returns:
        dict: Results of the experiment including timing and model performance.
    """
    start_time = time.time()

    cluster = LocalCluster(
        n_workers=config['cores'], 
        threads_per_worker=config['workers'], 
        memory_limit=config['memory'],
        host='10.99.251.210',
        processes=True
    )
    client = Client(cluster)

    # Display Dask dashboard link
    display(HTML(f"<a href='{client.dashboard_link}'>Dask Dashboard</a>"))
    print(f"Dashboard link: {client.dashboard_link}")
    print(f"Cluster status: {client.status}")
    
    # Load and preprocess data
    data_load_start = time.time()
    with performance_report(filename="dask-report_"+f"{config_name}.html"):
        X_train, y_train, X_test, y_test = load_and_preprocess_data()
        data_load_time = time.time() - data_load_start
        
        # Create PyTorch datasets and dataloaders
        train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                                    torch.tensor(y_train, dtype=torch.float32))
        test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32),
                                    torch.tensor(y_test, dtype=torch.float32))
        
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=32)
        
        # Train and evaluate different model types
        model_results = {}
        for model_type in ["LSTM", "RNN", "GRU", "CNN"]:
            # Train the model
            train_start = time.time()
            model = train_model_parallel(client, model_type, X_train, y_train)
            train_time = time.time() - train_start
            
            # Evaluate the model
            mse, predictions, actuals = evaluate_model(model, test_loader)
            model_results[model_type] = {"train_time": train_time, "mse": mse}
        
        # Calculate total experiment time
        total_time = time.time() - start_time

        return {
            "total_time": total_time,
            "data_load_time": data_load_time,
            "model_results": model_results
        }

In [9]:
for config_name, config in local_configs.items():
    print(f"Running experiment with {config_name}")
    results[config_name] = run_experiment(config_name, config)

Running experiment with 1cpu


Dashboard link: http://10.99.251.210:8787/status
Cluster status: running


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


(<dask_expr.expr.Scalar: expr=(FromGraph(2f6d3f0)[['Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']]).size() // 6, dtype=int64>, 6)
[[2.74835103e-04 1.69209777e-04 1.70626907e-04 1.69495501e-04
  6.31981003e-02 1.30660501e-04]
 [2.61691656e-04 1.54911275e-04 1.56208655e-04 1.55172855e-04
  2.36989104e-02 1.19619550e-04]
 [2.42571727e-04 1.35844437e-04 1.36982147e-04 1.36073836e-04
  1.42459064e-02 1.04896498e-04]
 ...
 [6.24304042e-01 6.25881092e-01 6.25977798e-01 6.24158506e-01
  5.76907468e-04 6.24167551e-01]
 [6.24646663e-01 6.29361775e-01 6.27743426e-01 6.27238654e-01
  5.05777644e-04 6.27247624e-01]
 [6.55931493e-01 6.59726906e-01 6.51946176e-01 6.57419737e-01
  1.39044724e-03 6.57427981e-01]]
Running experiment with 4cpu


Perhaps you already have a cluster running?
Hosting the HTTP server on port 39580 instead


Dashboard link: http://10.99.251.210:39580/status
Cluster status: running


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


(<dask_expr.expr.Scalar: expr=(FromGraph(6476fd4)[['Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']]).size() // 6, dtype=int64>, 6)


### Calculate Metrics

In [None]:
results

{'6cpu': {'total_time': 279.09371542930603,
  'data_load_time': 137.56212401390076,
  'model_results': {'LSTM': {'train_time': 46.43147158622742,
    'mse': 0.0070845443},
   'RNN': {'train_time': 26.03994607925415, 'mse': 0.010520764},
   'GRU': {'train_time': 41.06871485710144, 'mse': 0.009238123},
   'CNN': {'train_time': 22.388364553451538, 'mse': 0.01405808}}},
 '36cpu': {'total_time': 256.4476466178894,
  'data_load_time': 85.8149254322052,
  'model_results': {'LSTM': {'train_time': 72.83044409751892,
    'mse': 0.007902598},
   'RNN': {'train_time': 26.83320379257202, 'mse': 0.011435454},
   'GRU': {'train_time': 41.5639283657074, 'mse': 0.009112307},
   'CNN': {'train_time': 22.357224702835083, 'mse': 0.012968393}}}}

In [None]:
base_config = '1cpu'
# Calculate speedup and efficiency
base_time = results[base_config]['total_time']
# base_time= 456.69
for config_name, result in results.items():
    speedup = base_time / result['total_time']
    efficiency = speedup / local_configs[config_name]['cores']
    results[config_name]['speedup'] = speedup
    results[config_name]['efficiency'] = efficiency

# Print results
for config_name, result in results.items():
    print(f"\nResults for {config_name}:")
    print(f"Data load time: {result['data_load_time']:.2f} seconds")
    print(f"Total time: {result['total_time']:.2f} seconds")
    print(f"Speedup: {result['speedup']:.2f}")
    print(f"Efficiency: {result['efficiency']:.2f}")
    print("Model results:")
    for model_type, model_result in result['model_results'].items():
        print(f"  {model_type}: Train time = {model_result['train_time']:.2f} seconds, MSE = {model_result['mse']:.6f}")


Results for 6cpu:
Data load time: 137.56 seconds
Total time: 279.09 seconds
Speedup: 1.64
Efficiency: 0.55
Model results:
  LSTM: Train time = 46.43 seconds, MSE = 0.007085
  RNN: Train time = 26.04 seconds, MSE = 0.010521
  GRU: Train time = 41.07 seconds, MSE = 0.009238
  CNN: Train time = 22.39 seconds, MSE = 0.014058

Results for 36cpu:
Data load time: 85.81 seconds
Total time: 256.45 seconds
Speedup: 1.78
Efficiency: 0.30
Model results:
  LSTM: Train time = 72.83 seconds, MSE = 0.007903
  RNN: Train time = 26.83 seconds, MSE = 0.011435
  GRU: Train time = 41.56 seconds, MSE = 0.009112
  CNN: Train time = 22.36 seconds, MSE = 0.012968


### Plotting Results

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

def plot_results(results, output_folder='plots'):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    df = pd.DataFrame({
        'Configuration': list(results.keys()),
        'Total Time': [results[k]['total_time'] for k in results],
        'Cores': [int(k.split("cpu")[0]) for k in results],
        'Speedup': [results[k]['speedup'] for k in results],
        'Efficiency': [results[k]['efficiency'] for k in results]
    })

    ### 1. Total Time vs Cores ###
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=df, x='Cores', y='Total Time', marker='o', color='blue')
    plt.title('Total Time vs Number of Cores')
    plt.xlabel('Number of Cores')
    plt.ylabel('Total Time (seconds)')
    plt.legend(title='Number of Cores', labels=df['Cores'], loc='best')
    plt.grid(True)
    plt.savefig(os.path.join(output_folder, 'Total_time_vs_cores.png'))
    plt.close()

    ### 2. Speedup vs Cores ###
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=df, x='Cores', y='Speedup', marker='o', color='green')
    plt.title('Speedup vs Number of Cores')
    plt.xlabel('Number of Cores')
    plt.ylabel('Speedup')
    plt.legend(title='Number of Cores', labels=df['Cores'], loc='best')
    plt.grid(True)
    plt.savefig(os.path.join(output_folder, 'speedup_vs_cores.png'))
    plt.close()

    ### 3. Efficiency vs Cores ###
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=df, x='Cores', y='Efficiency', marker='o', color='orange')
    plt.title('Efficiency vs Number of Cores')
    plt.xlabel('Number of Cores')
    plt.ylabel('Efficiency')
    plt.legend(title='Number of Cores', labels=df['Cores'], loc='best')
    plt.grid(True)
    plt.savefig(os.path.join(output_folder, 'efficiency_vs_cores.png'))
    plt.close()

    ### 4. Speedup vs Efficiency ###
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=df, x='Speedup', y='Efficiency', marker='o', color='red')
    plt.title('Speedup vs Efficiency')
    plt.xlabel('Speedup')
    plt.ylabel('Efficiency')
    plt.legend(title='Number of Cores', labels=df['Cores'], loc='best')
    plt.grid(True)
    plt.savefig(os.path.join(output_folder, 'speedup_vs_efficiency.png'))
    plt.close()

    ### 5. Combined Plot of Total Time, Speedup, and Efficiency ###
    fig, ax1 = plt.subplots(figsize=(12, 6))

    sns.lineplot(data=df, x='Cores', y='Total Time', marker='o', color='blue', label='Total Time', ax=ax1)
    ax1.set_xlabel('Number of Cores')
    ax1.set_ylabel('Total Time (seconds)', color='blue')
    ax1.tick_params(axis='y', labelcolor='blue')
    ax1.grid(True)

    ax2 = ax1.twinx()
    sns.lineplot(data=df, x='Cores', y='Speedup', marker='o', color='green', label='Speedup', ax=ax2)
    sns.lineplot(data=df, x='Cores', y='Efficiency', marker='o', color='orange', label='Efficiency', ax=ax2)
    ax2.set_ylabel('Speedup / Efficiency', color='green')
    ax2.tick_params(axis='y', labelcolor='green')

    plt.title('Total Time, Speedup, and Efficiency vs Cores')
    fig.legend(loc="upper center", bbox_to_anchor=(0.5, 0.95), ncol=3)
    plt.savefig(os.path.join(output_folder, 'combined_Total_time_speedup_efficiency.png'))
    plt.close()

plot_results(results)

## Sentiment Analysis on Current News on the Ticker

In [None]:
from newsapi import NewsApiClient

# Initialize the News API client
newsapi = NewsApiClient(api_key='29c97c6c3de44b9088256456aa3e9023')

def fetch_news(ticker, date):
    articles = newsapi.get_everything(
        q=ticker,
        from_param=date,
        to=date,
        language='en',
        sort_by='relevancy'
    )
    return articles['articles']

In [None]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import nltk

# Download VADER lexicon
nltk.download('vader_lexicon')
sid = SentimentIntensityAnalyzer()

def analyze_sentiment(text):
    sentiment = sid.polarity_scores(text)
    return sentiment['compound']

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/chandrasekaran.n/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [None]:
news_articles = fetch_news("Appl", "2024-11-31")
if news_articles:
    # Combine all articles' text for the day
    all_text = " ".join([article['title'] + " " + article['description'] for article in news_articles if article['description']])
    sentiment_score = analyze_sentiment(all_text)
sentiment_score

0.9991

In [None]:
import torch
import numpy as np
from torch.utils.data import TensorDataset, DataLoader

# Create a single sample test data
X_test_sample = np.array([
    [67.500000, 72.750000, 64.199997, 69.750000, 1527.0],
    [66.000000, 66.000000, 63.750000, 65.699997, 353.0]
])
y_test_sample = np.array([67.500000, 66.000000])

# Reshape X_test_sample to match the expected input shape (batch_size, sequence_length, features)
X_test_sample = X_test_sample.reshape(2, 1, 5)

# Convert to PyTorch tensors
X_test_tensor = torch.tensor(X_test_sample, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test_sample, dtype=torch.float32)

# Create a TensorDataset
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create a DataLoader with batch_size=1
test_loader = DataLoader(test_dataset, batch_size=1)

# Initialize your model (assuming LSTM for this example)
model = BaseModel(input_dim=5, model_type="LSTM")

# Evaluate the model
model.eval()
predictions = []
actuals = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        y_pred = model(X_batch).squeeze()
        predictions.append(y_pred.numpy())
        actuals.append(y_batch.numpy())

print(f"Predictions: {predictions}")
print(f"Actuals: {actuals}")

Predictions: [array(0.12440727, dtype=float32), array(0.1240508, dtype=float32)]
Actuals: [array([67.5], dtype=float32), array([66.], dtype=float32)]


In [None]:
if sentiment_score > 0.5:
    print(f"The prediction might go up because of the recent news {predictions}")
else:
    print(f"The prediction might go down because of the recent news {predictions}")

The prediction might go up because of the recent news [array(0.12440727, dtype=float32), array(0.1240508, dtype=float32)]
