In [1]:
import replicate
from tqdm import tqdm
import random
import os
from sklearn.metrics import mean_squared_error, r2_score
from collections import Counter
import numpy as np
import re
from sklearn.metrics import f1_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import matthews_corrcoef
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np
import ast
## model

import pickle

In [2]:
os.environ["REPLICATE_API_TOKEN"] = "r8_04rPEJPKNfBUADRXnqucmHpQmFAW5NU0Anovt"
api = replicate.Client(api_token=os.environ["REPLICATE_API_TOKEN"])

In [15]:

PARAMS = dict({

	'train_on': [2015, 2021, 2017, 2018, 2019, 2020, 2016],
	'test_on': [2023, 2022, 2024],

	'epochs': 60,
	'LR': 0.95e-4,

	'Pred_period': '3m',
	'Zero-Shot': 0,
	'Fine_Tune': 1,
	"save_dir": './3m.txt'
	})
RESULTS = []
INSTRUCTION = "Financial reports data for a specefic company for the past four quarters is given in a tab-separated table bellow. \
in the table, K, M, and B means thousands, millions, and billions. predict if the stock price is going up or down at the end of the next quarter, in 3 months. \
Give a one word response with either [UP] or [DOWN]"


In [32]:

def extract_float(text):
		match = re.search(r"[-+]?\d*\.\d+", text)
		if match:
				return float(match.group(0))
		else:
				return None

def r4(value):
	if isinstance(value, float):
		# If the input is a single float, round it and format it to 4 decimal places
		return f"{round(value, 3):.3f}"
	else:
		value = list(value)
		return [f"{round(v, 3):.3f}" for v in value]

def evaluate(y_test,y_pred, roundd = False, arrayed = False):
	print(precision_recall_fscore_support(y_test, y_pred))    
	[pp, pn] = list(precision_recall_fscore_support(y_test, y_pred)[0])
	[rp, rn] = list(precision_recall_fscore_support(y_test, y_pred)[1])
	[fp, fn] = list(precision_recall_fscore_support(y_test, y_pred)[2])
	wf1 = f1_score(y_test, y_pred, average = 'weighted')
	acc = accuracy_score(y_test, y_pred)
	mcc = matthews_corrcoef(y_test, y_pred)
	if roundd:
		[pp, pn] = [r4(pp), r4(pn)]
		[rp, rn] = [r4(rp), r4(rn)]
		[fp, fn] = [r4(fp), r4(fn)]
		wf1 = r4(wf1)
		acc = r4(acc)
		mcc = r4(mcc)
	
	
	res =  {'P+-': [pp, pn],\
			'R+-': [rp, rn],\
			'f1s': [fp, fn],\
			'wf1': wf1,\
			'ACC': acc,\
			'MCC': mcc}
	
	if arrayed:
		return [pp, pn, rp, rn, fp, fn, wf1, acc, mcc]
	return res

def np_ratio(arr):
	ar = []
	for i in arr:
		if i == '[UP]':
			ar.append(1)
		else:
			ar.append(0)
	C = Counter(ar)
	return 'Neg: ' + str(C[0]/(C[1]+C[0])) + ' Pos: ' + str(C[1]/(C[1]+C[0])) 

def evaluator(test_data):
	generated_output = []
	real_output = []
	for i in tqdm(test_data):
		input = {
			"top_p": 1,
			"prompt": i['input'] + '\n Is the stock price going [UP] or [DOWN] 3 months from now? \n Answer:\n',
			"temperature": 0.25,
			"system_prompt": i['instruction'],
			"max_new_tokens": 20,
			"repetition_penalty":1.25
		}
		output = api.run("meta/meta-llama-3-8b-instruct",input=input)
		output = ''.join(output)
		if 'up' in output.lower() and 'down' in output.lower():
			out_bin = None
		elif 'up' in output.lower():
			out_bin = 1.0
		elif 'down' in output.lower():
			out_bin = 0.0
		else:
			out_bin = None

		benchmark = 1.0 if 'up' in i["output"].lower() else 0.0

		generated_output.append(out_bin)
		real_output.append(benchmark)

	valid_indices = [i for i, output in enumerate(generated_output) if output is not None]
	generated_output_filtered = np.array([generated_output[i] for i in valid_indices])
	real_output_filtered = np.array([real_output[i] for i in valid_indices])
	print('validated generations:', len(generated_output_filtered)/len(test_data))
	res = evaluate(real_output_filtered, generated_output_filtered)
	print('in evaluator', res)
	return res, (real_output_filtered, generated_output_filtered)

def get_dataset(target, train_years, test_years, bin_targets = True, dir = './prompts/'):
	train_datas = []
	test_datas = []

	for i in os.listdir(dir):
		if i.split('.')[-1]=='pkl':
			with open(dir+i, 'rb') as f:
				data = pickle.load(f)
				data = [dict(zip(data.keys(), values)) for values in zip(*data.values())]
				
				for i in data:
					target_datum = i['targets_bin'][target] if bin_targets else i['targets'][target]
					target_datum = '[UP]' if target_datum else '[DOWN]'
					datum = {'instruction': INSTRUCTION, 'input': i['prompts'], 'output': target_datum}
					if int(i['dates'].split('-')[0]) in train_years:
						train_datas.append(datum)
					elif int(i['dates'].split('-')[0]) in test_years:
						test_datas.append(datum)
	random.shuffle(train_datas)
	random.shuffle(test_datas)
	return train_datas, test_datas

def logger(res, model = 'LLaMA3-8B'):
	resu = {}
	resu['model'] = model
	for i in res.keys():
		resu[i] = res[i]
	RESULTS.append(resu)


In [33]:
train_dict, test_dict = \
	get_dataset(target = PARAMS['Pred_period'], train_years = PARAMS['train_on'], test_years=PARAMS['test_on'])

In [34]:
res, stats = evaluator(test_dict[:100])
    


100%|██████████| 100/100 [02:18<00:00,  1.39s/it]

validated generations: 1.0
(array([0.64583333, 0.61538462]), array([0.60784314, 0.65306122]), array([0.62626263, 0.63366337]), array([51, 49]))
in evaluator {'P+-': [0.6458333333333334, 0.6153846153846154], 'R+-': [0.6078431372549019, 0.6530612244897959], 'f1s': [0.6262626262626263, 0.6336633663366337], 'wf1': 0.6298889888988899, 'ACC': 0.63, 'MCC': 0.26106110814618794}





In [35]:
res

{'P+-': [0.6458333333333334, 0.6153846153846154],
 'R+-': [0.6078431372549019, 0.6530612244897959],
 'f1s': [0.6262626262626263, 0.6336633663366337],
 'wf1': 0.6298889888988899,
 'ACC': 0.63,
 'MCC': 0.26106110814618794}