In [5]:
from langchain import FewShotPromptTemplate, PromptTemplate
from langchain.chains import LLMChain
import matplotlib.pyplot as plt
from dataclasses import dataclass
from transformers import AutoModelForCausalLM, AutoTokenizer
from langchain import HuggingFaceHub
import seaborn as sns
from datetime import datetime
import pandas as pd
import numpy as np
import os
import torch
from tqdm import tqdm

os.environ["HUGGINGFACEHUB_API_TOKEN"] = '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'

In [2]:
num_points = 40
frequency1 = 1
amplitude1 = 5
frequency2 = 2
amplitude2 = 3
frequency3 = 0.5
amplitude3 = 7
time_index = pd.date_range(start="2022-01-01", periods=num_points, freq="D")
sine_series_complicated = (
    amplitude1 * np.sin(2 * np.pi * frequency1 * np.arange(num_points) / num_points) +
    amplitude2 * np.sin(2 * np.pi * frequency2 * np.arange(num_points) / num_points) +
    amplitude3 * np.sin(2 * np.pi * frequency3 * np.arange(num_points) / num_points)
)
sine_series_complicated = pd.Series(sine_series_complicated, index=time_index, name="ComplicatedSine")
sine_series_complicated.shape

(40,)

In [3]:
train = sine_series_complicated.iloc[:300]
test = sine_series_complicated.iloc[300:]

In [4]:
@dataclass
class Scaler:
    transform: callable = lambda x: x
    inv_transform: callable = lambda x: x

In [ ]:
class LLMTime:
    
    def __init__(self, train, test, alpha=0.95, beta=0.3, batch_length=400, basic=False, temperature=0.5, do_sample=True, model_name="mistralai/Mistral-7B-v0.1", repetition_penalty=1.0):
        self.model_name = model_name
        self.train = train
        self.test = test
        self.alpha = alpha
        self.beta = beta
        self.batch_length = batch_length
        self.basic = basic
        self.temperature = temperature
        self.do_sample = do_sample
        self.repetition_penalty = repetition_penalty
        self.scalers = None
        self.input_str = None
        self.test_str = None
        self.tokenizer = None
        self.model = None
        self.good_tokens = None
        self.bad_tokens = None
        self.output = None
        self.transformed_output_arr = None
        self.preprocess_data()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    @staticmethod
    def get_scaler(history, alpha=0.95, beta=0.3, basic=False):
        history = history[~np.isnan(history)]
        if basic:
            q = np.maximum(np.quantile(np.abs(history), alpha),.01)
            def transform(x):
                return x / q
            def inv_transform(x):
                return x * q
        else:
            min_ = np.min(history) - beta*(np.max(history)-np.min(history))
            q = np.quantile(history-min_, alpha)
            if q == 0:
                q = 1
            def transform(x):
                return (x - min_) / q
            def inv_transform(x):
                return x * q + min_
        return Scaler(transform=transform, inv_transform=inv_transform)
    
    def convert_array_to_string(self, arr):
        rounded_values = [round(val * 1000) for val in arr]
        str_values = [str(val) for val in rounded_values]
        result_string = ",".join(str_values)
        return result_string

    def preprocess_data(self):
        train = self.train
        test = self.test
        if not isinstance(train, list):
            train = [train]
            test = [test]
        n_val = len(train)
        for i in range(len(train)):
            if not isinstance(train[i], pd.Series):
                train[i] = pd.Series(train[i], index=pd.RangeIndex(len(train[i])))
                test[i] = pd.Series(test[i], index=pd.RangeIndex(len(train[i]), len(test[i])+len(train[i])))
        alpha = self.alpha
        beta = self.beta
        basic = self.basic
        self.scalers = [self.get_scaler(train[i].values, alpha=alpha, beta=beta, basic=basic) for i in range(len(train))]
        input_arrs = [train[i].values for i in range(len(train))]
        transformed_input_arrs = np.array([scaler.transform(input_array) for input_array, scaler in zip(input_arrs, self.scalers)])
        input_str = self.convert_array_to_string(transformed_input_arrs[0])
        test_arrs = [test[i].values for i in range(len(test))]
        transformed_test_arrs = np.array([scaler.transform(input_array) for input_array, scaler in zip(test_arrs, self.scalers)])
        test_str = self.convert_array_to_string(transformed_test_arrs[0])
        self.input_str = input_str
        self.test_str = test_str
        return input_str, test_str, self.scalers
    
    def zero_shot(self):
        torch.cuda.empty_cache()
        from transformers import AutoModelForCausalLM, AutoTokenizer
        model = AutoModelForCausalLM.from_pretrained(
            self.model_name, device_map="auto", load_in_4bit=True
        )
        tokenizer = AutoTokenizer.from_pretrained(self.model_name, padding_side="left")
        good_tokens_str = list("0123456789,")
        good_tokens = [tokenizer.convert_tokens_to_ids(token) for token in good_tokens_str]
        bad_tokens = [i for i in range(len(tokenizer)) if i not in good_tokens]
        input_str = self.input_str
        for i in tqdm(range(self.batch_length, len(input_str) - self.batch_length, self.batch_length//10)):
              batch_str = input_str[i-self.batch_length: i]
              model_inputs = tokenizer(batch_str, return_tensors='pt').to('cuda')
              generated_ids = model.generate(**model_inputs, do_sample=self.do_sample, max_new_tokens=self.batch_length//10, temperature=self.temperature, repetition_penalty=self.temperature, bad_words_ids=[[t] for t in bad_tokens], force_words_ids=[[t] for t in good_tokens])
              output = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
              print(f"Batch: {batch_str}")
              print(f"Actual: {input_str[i: i+self.batch_length//10]}")
              print(f"Total: {input_str}")
              print(f"Pred: {output[-self.batch_length//10:]}")
        inp = input_str[-self.batch_length:]
        model_inputs = tokenizer(inp, return_tensors='pt').to(self.device)
        generated_ids = model.generate(**model_inputs, do_sample=self.do_sample, max_new_tokens=self.batch_length//10, temperature=self.temperature, repetition_penalty=self.temperature, bad_words_ids=[[t] for t in bad_tokens], force_words_ids=[[t] for t in good_tokens])
        output = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
        return output
    
    @staticmethod    
    def invert_string_to_array(string_values):
        string_values = string_values.replace(" ", "")
        if string_values[-1] == ',':
            string_values = string_values[:len(string_values) - 1]
        str_values_list = string_values.split(',')
        float_values = [float(val) if val.strip() != '' else 0.0 for val in str_values_list]
        original_values = [val / 1000 for val in float_values]
        return original_values

    def get_original_array(self, output_str):
        output_arr = self.invert_string_to_array(string_values=output_str)
        output_arr = np.array(output_arr)
        transformed_output_arr = self.scalers[0].inv_transform(output_arr)
        self.transformed_output_arr = transformed_output_arr
        return transformed_output_arr
    
    def plot(self, input, output, m):
        model_parameters = {
            "alpha": self.alpha,
            "beta": self.beta,
            "batch_length": self.batch_length,
            "basic": self.basic,
            "temperature": self.temperature,
            "do_sample": self.do_sample,
            "repetition_penalty": self.repetition_penalty
        }
        sns.set(style="whitegrid")
        fig, ax = plt.subplots(figsize=(10, 6))
        plt.title(f"Few Shot {self.model_name}")
        
        # Add a vertical line at x = m
        plt.axvline(x=m, color='k', linestyle='--', label='Train-Test Split')
        plt.plot(output, color='red', label='Predicted')
        plt.plot(input, color='blue', label='Actual')
        plt.legend()
        
        plt.grid(True)
        plt.xlabel('Time')
        sns.set_palette("husl")
        
        textstr = '\n'.join([f"{key}: {value}" for key, value in model_parameters.items()])
        props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
        ax.text(1.05, 0.5, textstr, transform=ax.transAxes, fontsize=10, verticalalignment='center', bbox=props)
        
        current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        filename = f"plots/mistral-few-shot-{current_time}.png"
        plt.savefig(filename, dpi=300, bbox_inches='tight')
        
        plt.show()
    
    def run(self):
        self.zero_shot()
        output = self.output
        return output
        
    def plot_and_return(self, output):
        transformed_output_arr = self.get_original_array(self.input_str + ',' + output)
        input_arr = self.get_original_array(self.input_str + ',' + self.test_str)
        inp = self.get_original_array(self.input_str)
        m = len(inp)
        input_arr = input_arr[:len(transformed_output_arr)]
        self.plot(input_arr, transformed_output_arr, m)