In [None]:
import torch
import seaborn as sns
import pandas as pd
import transformers
import evaluate
from transformers import pipeline, AutoTokenizer
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import load_dataset, load_from_disk
from collections import Counter, defaultdict
from transformers import DataCollatorWithPadding
from pathlib import Path
import numpy as np
from sklearn.metrics import f1_score, classification_report, accuracy_score
from tqdm.auto import tqdm
import re
from nltk.tokenize import sent_tokenize
from sklearn.linear_model import LinearRegression
sns.set()

In [None]:
#dataset = load_from_disk('/datadrive_2/')
#test_data = dataset['test']

cache_dir = '/datadrive_2/hf_cache/'
dataset = load_from_disk("/datadrive_2/HMD_chunked_100_test/")
dataset #= dataset['train']

## Classify by Political Leaning

In [None]:
pol_pattern = re.compile(r'\bliberal|\bconservat|\btory\b|\btories\b',re.I)

In [None]:
pol_pattern.findall('liberal governments do not fire their conservative political ministers minister')

In [None]:
# def sent_split(x):
#      return {'data': [
#                 {'sentence':s.lower(),
#                  'length': len(s.split()),
#                  'pol': p, 'loc':l, 'year':y, 'ocr':o,'nlp':n} 
#                      for y,p,l,o,n,t in zip(x['year'],x['pol'],x['loc'],x['ocr_quality_mean'],x['nlp'],x['text']) 
#                       for s in sent_tokenize(t) 
#                          if pol_pattern.findall(s)
#                  ]
#             }

# test_data = dataset.map(sent_split,batched=True, remove_columns=dataset.column_names)

In [None]:
test_data = dataset.map(lambda x: {'sentences': x['sentences'].lower()}, num_proc=6).shuffle(seed=42).select(range(15000))

In [None]:
test_data

In [None]:
#data = test_data.filter(lambda x: x['data.length'] > 25).shuffle(seed=42).select(range(15000))

In [None]:
# def pred_data(example,add_field='year'):
#     return {'st_year_sep': f'[{example[add_field]}]' + ' [SEP] ' + example['sentences'] ,
#      'year_sep': str(example[add_field]) + ' [SEP] ' + example['sentences'] ,
#      'year_date': str(example[add_field]) + ' [DATE] ' + example['sentences'],
        
#     }
def pred_data(example):
    return {'label':float(example['year'])}
    
data = test_data.map(pred_data , num_proc=6)

In [None]:
len(data)

In [None]:
# lab2code = {'[con]':0,'[lib]':1,'[rad]':2,'[neutr]':3,'[none]':4}
# num_labels = len(lab2code)
# data = data.map(lambda x: {'label': lab2code[x['pol']]})

In [None]:
data[0]

In [None]:
test_size = int(len(data)*.2)
train_test = data.train_test_split(test_size=test_size, seed=1984)
test_set = train_test['test']
val_size = int(len(train_test['train'])*.15)
train_val =  train_test['train'].train_test_split(test_size=val_size, seed=1984)

In [None]:
train_val

In [None]:
checkpoints = [('distilbert','distilbert-base-uncased','[SEP]','sentences'),
               ('hmd_distilbert','/datadrive_2/bnert-hmd','[SEP]','sentences'),
               #('bnert-time-st-y','/datadrive_2/bnert-time-st-y','[SEP]','sentences'),
               ('bnert-time-y','/datadrive_2/bnert-time-y','[DATE]','sentences'),
               ('bnert-time-y_masked_25','/datadrive_2/bnert-time-y_masked_25','[DATE]','sentences'),
               ('bnert-time-y_masked_75','/datadrive_2/bnert-time-y_masked_75','[DATE]','sentences'),
               #('bnert-pol-st','/datadrive_2/bnert-pol-st','[SEP]','year_sep'),
               #('bnert-pol','/datadrive_2/bnert-pol','[SEP]','sentences')
              ]

model_dict = defaultdict(dict)
for name,checkpoint, st, sent_col in checkpoints:
    model_dict[name]['model'] = AutoModelForSequenceClassification.from_pretrained(checkpoint,num_labels=1)
    model_dict[name]['tokenizer'] = AutoTokenizer.from_pretrained(checkpoint)
    #model_dict[name]['special_token'] = st
    model_dict[name]['sentences'] = sent_col

In [None]:
train_val = train_val.remove_columns(['nlp', 'ocr', 'loc'])
train_val

In [None]:
#def add_text_col(example,source):
#    return {'text' : example[source]}

def preprocess_function(examples, target_col):
    return tokenizer(examples[target_col], truncation=True, padding="max_length", max_length=256)

In [None]:
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score

def compute_metrics_for_regression(eval_pred):
    logits, labels = eval_pred
    labels = labels.reshape(-1, 1)
    #print(labels, logits)
    mse = mean_squared_error(labels, logits)
    mae = mean_absolute_error(labels, logits)
    r2 = r2_score(labels, logits)
    single_squared_errors = ((logits - labels).flatten()**2).tolist()
    
    # Compute accuracy 
    # Based on the fact that the rounded score = true score only if |single_squared_errors| < 0.5
    accuracy = sum([1 for e in single_squared_errors if e < 0.25]) / len(single_squared_errors)
    
    return {"mse": mse, "mae": mae, "r2": r2, "accuracy": accuracy}

class RegressionTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs[0][:, 0]
        loss = torch.nn.functional.mse_loss(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
result_dict = defaultdict(dict)

for name, mdict in model_dict.items():
    print(f'Creating a model for {name}')
    tokenizer = model_dict[name]['tokenizer']
    model = model_dict[name]['model']
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    sent_col = model_dict[name]['sentences']
    
    #train_val = train_val.map(add_text_col,fn_kwargs={'source': sent_col})
    train_val = train_val.map(preprocess_function,fn_kwargs={'target_col': sent_col})
    
    training_args = TrainingArguments(
    seed=1984,
    evaluation_strategy="epoch",
    output_dir=f"./results_{name}",
    learning_rate=2e-4,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=25,
    weight_decay=0.01,
        )

#     trainer = Trainer(
#     model=model,
#     args=training_args,
#     train_dataset=train_val["train"],
#     eval_dataset=train_val["test"],
#     tokenizer=tokenizer,
#     data_collator=data_collator,
#         )


#     trainer.train()

    trainer = RegressionTrainer(
        model=model,
        args=training_args,
        train_dataset=train_val["train"],
        eval_dataset=train_val["test"],
        compute_metrics=compute_metrics_for_regression,
        )

    trainer.train()
    test_set = test_set.map(preprocess_function,fn_kwargs={'target_col': sent_col})
    trainer.eval_dataset=test_set
    trainer.evaluate()
    scores = trainer.evaluate()
    #print(scores)
    model.save_pretrained(f'/datadrive_2/{name}-pol')
    tokenizer.save_pretrained(f"/datadrive_2/{name}-pol")
    
    
    #predictions = trainer.predict(test_set)
    #preds = np.argmax(predictions.predictions, axis=-1)
    result_dict[name]['mae'] = scores['eval_mae']#f1_score(preds,predictions.label_ids,average='macro')
    result_dict[name]['mse'] = scores['eval_mse']#f1_score(preds,predictions.label_ids,average='micro')
    result_dict[name]['accuracy']  = scores['eval_accuracy']

In [None]:
results_df = pd.DataFrame.from_dict(result_dict, orient='index')

In [None]:
print(results_df.round(3).to_latex())

In [None]:
results_df.to_csv('tables/classsify_time.csv')