# Imports

In [25]:
import re
import os
from os import walk
import io

import time
from timeit import default_timer as timer
from datetime import datetime

import string
import numpy as np
import pandas as pd

import math

import random
from random import seed, randint, shuffle

import json

from statistics import mean, mode, stdev, median

from tqdm.auto import tqdm

In [2]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import pipeline

import torch
import torch.nn.functional as F

# Data

In [5]:
data = pd.read_csv('../../attachment speech/LSTM_FT/Data/attachment_transcribed_lr_v1.csv', header=0)

# Extract Hidden Layer Output

## Model

In [7]:
# classifier = pipeline("text-classification",
#                       model='bhadresh-savani/bert-base-uncased-emotion', top_k=None) #return_all_scores=True)

In [8]:
from transformers.models.bert.modeling_bert import (
    BertPreTrainedModel,
    BertModel,
    BaseModelOutputWithPoolingAndCrossAttentions
    #SequenceClassifierOutput
)

from transformers.file_utils import ModelOutput
from transformers import BertForSequenceClassification
from dataclasses import dataclass
from typing import Optional, Tuple, Union, List

@dataclass
class SequenceClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    pooled_hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

In [9]:
class CustomBertForSequenceClassification(BertForSequenceClassification):
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        pooled_output = outputs[1]

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            pooled_hidden_states=pooled_output,
            attentions=outputs.attentions,
        )

In [10]:
# Load the tokenizer and BERT model for sequence classification
# tokenizer = AutoTokenizer.from_pretrained('bhadresh-savani/bert-base-uncased-emotion')
# model = AutoModelForSequenceClassification.from_pretrained('bhadresh-savani/bert-base-uncased-emotion', 
#                                                            output_hidden_states=True, output_attentions=True, return_dict=True)

tokenizer = AutoTokenizer.from_pretrained('bhadresh-savani/bert-base-uncased-emotion')
model = CustomBertForSequenceClassification.from_pretrained('bhadresh-savani/bert-base-uncased-emotion', output_hidden_states=True, return_dict=True)

# Define a function to process the text and get the classes and the output of the last hidden layer
def get_logits_and_last_hidden_layer(text):
    # Tokenize the text
    inputs = tokenizer(text, return_tensors='pt')
    # Forward pass through the BERT model
    outputs = model(**inputs)
    # Get the logits from the output
    logits = outputs.logits
    
    # Get the output of the last hidden layer
    last_hidden_layer = outputs.hidden_states[-1]
    pooled_hidden_layer = outputs.pooled_hidden_states
    
    return logits, last_hidden_layer, pooled_hidden_layer

In [11]:
def logits2LabelsDF(logits):
    scores = F.softmax(logits, dim=1).detach().numpy()[0]
    labels = [{'label': model.config.id2label[i], 'score': score} for i, score in enumerate(scores)]

    labelsDF = pd.DataFrame()
    for label in labels:
        labelsDF[label['label']] = [label['score']]

    return labelsDF

In [12]:
def tensor2JSON(last_hidden_layer):
    # Convert the PyTorch tensor to a NumPy array
    last_hidden_layer_np = last_hidden_layer.detach().numpy()
    
    # Convert the NumPy array to a Python list
    last_hidden_layer_list = last_hidden_layer_np.tolist()
    
    # Serialize the Python list to JSON
    last_hidden_layer_json = json.dumps(last_hidden_layer_list)

    return last_hidden_layer_json

In [13]:
def tensor2Numpy(last_hidden_layer):
    # Convert the PyTorch tensor to a NumPy array
    last_hidden_layer_np = last_hidden_layer.detach().numpy()

    return last_hidden_layer_np

In [14]:
def poolOutput(hiddenOutputs):
    pooledOutput = torch.cat(hiddenOutputs, dim=1)

    return pooledOutput

## Extraction Chunks

In [22]:
data['Text'] = data['Text'].fillna('')

In [27]:
currID, currTask, segment = data.iloc[0].ID, data.iloc[0].Task, 0
subStoryHiddenOutputs = []

threshold = 0.1 # 1 ms

for index, row in tqdm(data.iterrows(), total=data.shape[0]):
    childDF = row.copy().to_frame().T.reset_index(drop=True)
   
    if (childDF.ID[0]==currID and childDF.Task[0]==currTask):
        segment += 1 
    else: 
        if len(subStoryHiddenOutputs) > 0:
            pooled_h_output = poolOutput(subStoryHiddenOutputs)
            # with open(folderAPath+'Data/Audio/Merged/{}/{}.json'.format(row.Task, row.ID), 'w') as outfile:
            #     outfile.write(tensor2JSON(pooled_h_output))   
            np.savez_compressed('Data_V2/TextChunks/Merged/{}/{}.npy'.format(currTask, currID), tensor2Numpy(pooled_h_output))
        
        segment = 1
        subStoryHiddenOutputs = []
        currID, currTask = childDF.ID[0], childDF.Task[0]

    if (childDF.End[0] - childDF.Start[0]) < threshold:
        skippedDF = childDF.copy()
        skippedDF['skipped_duration'] = childDF.End[0] - childDF.Start[0]

        with open('Data_V2/TextChunks/attachment_emotions_text_skipped_segments_v1.csv', 'a') as f:
            skippedDF.to_csv(f, mode='a', index=False, header=f.tell()==0)

    else:
        
        storyResults, h_output, pooled_output = get_logits_and_last_hidden_layer(childDF.Text[0])
        storyResultsDF = logits2LabelsDF(storyResults)
    
        subStoryHiddenOutputs.append(h_output)
        
        segDF = pd.concat([childDF, storyResultsDF], axis=1)
        segDF['time'] = childDF.Start[0]
        segDF['segment'] = segment
    
        # with open(folderAPath+'Data/Text/Unmerged/{}_{}_{}.json'.format(row.ID, row.Task, segment), 'w') as outfile:
        #     outfile.write(tensor2JSON(h_output))
    
        np.savez_compressed('Data_V2/TextChunks/Unmerged/{}/{}_{}.npy'.format(row.Task, row.ID, segment), tensor2Numpy(h_output))
    
        np.savez_compressed('Data_V2/TextChunks/Pooled/{}/{}_{}.npy'.format(row.Task, row.ID, segment), tensor2Numpy(pooled_output))
    
        with open('Data_V2/TextChunks/attachment_emotions_text_v1.csv', 'a') as f:
            segDF.to_csv(f, mode='a', index=False, header=f.tell()==0)

  0%|          | 0/2399 [00:00<?, ?it/s]