In [1]:
from transformers import BertModel, BertTokenizer, BertForSequenceClassification
import torch
import numpy as np
import pandas as pd
import re
from torch.utils.data import Dataset, DataLoader
from torch import nn
import nltk
from os import listdir
from os.path import isfile, join
import xml.etree.cElementTree as et
from bs4 import BeautifulSoup
import nltk
import nltk.data
from os import listdir
import pysent3 as ps
from os.path import isfile, join
import nltk.data
import eng_spacysentiment
nlp_spacy = eng_spacysentiment.load()

# nltk_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')

class TextDataset(Dataset):
  def __init__(self, text, targets, tokenizer, max_len):
    self.text = text
    self.targets = targets
    self.tokenizer = tokenizer
    self.max_len = max_len
  def __len__(self):
    return len(self.text)
  def __getitem__(self, item):
    text = str(self.text[item])
    target = self.targets[item]
    encoding = self.tokenizer.encode_plus(
      text,
      add_special_tokens=True,
      max_length=self.max_len,
      return_token_type_ids=False,
      padding = 'max_length', #pad_to_max_length=True,
      return_attention_mask=True,
      return_tensors='pt',
      truncation=True
    )
    return {
      'text_text': text,
      'input_ids': encoding['input_ids'].flatten(),
      'attention_mask': encoding['attention_mask'].flatten(),
      'targets': torch.tensor(target, dtype=torch.long)
    }

class SentimentClassifier(nn.Module):
  def __init__(self, n_classes):
    super(SentimentClassifier, self).__init__()
    self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
    self.drop = nn.Dropout(p=0.3)
    self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
  def forward(self, input_ids, attention_mask):
    _, pooled_output = self.bert(
      input_ids=input_ids,
      attention_mask=attention_mask, 
      return_dict=False
    )
    output = self.drop(pooled_output)
    return self.out(output)


def create_data_loader(df, tokenizer, max_len, batch_size):
  ds = TextDataset(
    text=df['Text'].to_numpy(),
    targets=df.sentiment_value.to_numpy(),
    tokenizer=tokenizer,
    max_len=max_len
  )
  return DataLoader(
    ds,
    batch_size=batch_size,
    num_workers=4
  )


def get_predictions(model, data_loader):
  model = model.eval()
  review_texts = []
  predictions = []
  prediction_probs = []
  real_values = []
  with torch.no_grad():
    for d in data_loader:
      texts = d["text_text"]
      input_ids = d["input_ids"].to(device)
      attention_mask = d["attention_mask"].to(device)
      targets = d["targets"].to(device)
      outputs = model(
        input_ids=input_ids,
        attention_mask=attention_mask
      )
      _, preds = torch.max(outputs, dim=1)
      review_texts.extend(texts)
      predictions.extend(preds)
      prediction_probs.extend(outputs)
      real_values.extend(targets)
  predictions = torch.stack(predictions).cpu()
  prediction_probs = torch.stack(prediction_probs).cpu()
  real_values = torch.stack(real_values).cpu()
  return review_texts, predictions, prediction_probs, real_values

RANDOM_SEED = 0 
PRE_TRAINED_MODEL_NAME = 'bert-large-uncased'
FINBERT_MODEL_NAME = 'finbert'
MAX_LEN = 85 
BATCH_SIZE = 32
EPOCHS = 8
LEARNING_RATE = 1e-5
TEST_DATA_PERCENT = 0.30

def get_finbert_sentiment(sentence):
    ###
    # labels = {0:'neutral', 1:'positive',2:'negative'}
    ###

    inputs = tokenizer_finbert(sentence, return_tensors="pt", truncation=True, padding=True)
    outputs = finbert_model(**inputs)[0]
#     return np.argmax(outputs.detach().numpy())
    return nn.Softmax(dim=1)(outputs).detach().numpy()[0]


  from .autonotebook import tqdm as notebook_tqdm


In [41]:
path = '/home/ec2-user/SageMaker/data/Party2016Oct/'
files = [f for f in listdir(path) if isfile(join(path, f))]

In [42]:
# Function to strip html tags from text portion
def strip_html_tags(text):
    stripped = BeautifulSoup(text).get_text().replace('\n', ' ').replace('\\', '').strip()
    return stripped

def extract_meta(path, files):

    meta = {'GOID': [],
            'PubFrosting_Title': [],
            'PubFrosting_SortTitle': [],
            'CurrentTitle_Title': [],
            'CurrentTitle_SortTitle': [],
#             'Title': [], 
            'MpubId': [],
            'Numeric Date': [],
            'Title of Newspaper': [],
            'Country': [],
            'Text': []
            }
    
    for file in files:
        file_path = path+file
        tree=et.parse(file_path)
        root=tree.getroot()

        # iteratre over the trees to extract metadata
        for item in root.iter('GOID'):
            meta['GOID'].append(item.text)
            
        if root.find('DFS/PubFrosting/Title') is not None:
            meta['PubFrosting_Title'].append(root.find('DFS/PubFrosting/Title').text)
        else:
            meta['PubFrosting_Title'].append('')

        if root.find('DFS/PubFrosting/SortTitle') is not None:
            meta['PubFrosting_SortTitle'].append(root.find('DFS/PubFrosting/SortTitle').text)
        else:
            meta['PubFrosting_SortTitle'].append('')

        if root.find('DFS/PubFrosting/CurrentTitle/Title') is not None:
            meta['CurrentTitle_Title'].append(root.find('DFS/PubFrosting/CurrentTitle/Title').text)
        else:
            meta['CurrentTitle_Title'].append('')

        if root.find('DFS/PubFrosting/CurrentTitle/SortTitle') is not None:
            meta['CurrentTitle_SortTitle'].append(root.find('DFS/PubFrosting/CurrentTitle/SortTitle').text)
        else:
            meta['CurrentTitle_SortTitle'].append('')
            

#         if root.find('.//Title') is not None:
#             meta['Title'].append(root.find('.//Title').text)
#         else:
#             meta['Title'].append('')


        for item in root.iter('NumericDate'):
            meta['Numeric Date'].append(item.text)
            
        for item in root.iter('PubFrosting'):
            for title in item.iter('Title'):
                meta['Title of Newspaper'].append(item[2].text)
                break
                
        for item in root.iter('PubFrosting'):
            for mpuid in item.iter('MpubId'):
                meta['MpubId'].append(mpuid.text)
                break
        
        country = root.find('.//Country')
        if country is not None:
            meta['Country'].append(country.text)
        else:
            meta['Country'].append('')
            
        if root.find('.//FullText') is not None:
            meta['Text'].append(strip_html_tags(root.find('.//FullText').text))
        elif root.find('.//HiddenText') is not None:
            meta['Text'].append(strip_html_tags(root.find('.//HiddenText').text))
        elif root.find('.//Text') is not None:
            meta['Text'].append(strip_html_tags(root.find('.//Text').text))
        else:
            meta['Text'].append('')

    return meta

In [44]:
meta = extract_meta(path, files)

df = pd.DataFrame(data=meta)

In [45]:
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')

In [46]:
matches_republican = ['republican', "republican's", 'republican', 'G.O.P.', 'GOP']

matches_democrat = ['democrat', "democrats", "democrat's"]

df['sent_rep'] = df.apply(lambda x: [sentence for sentence in tokenizer.tokenize(x['Text']) \
                                    if any(match.lower() in sentence.lower() for match in matches_republican)], axis=1)

df['sent_dem'] = df.apply(lambda x: [sentence for sentence in tokenizer.tokenize(x['Text']) \
                                    if any(match.lower() in sentence.lower() for match in matches_democrat)], axis=1)


In [47]:
ROOT_PATH = '/home/ec2-user/SageMaker/Getting Started/2022.05.25/ahkz_sentiment_classifier/ahkz_sentiment_classifier/'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
tokenizer = BertTokenizer.from_pretrained('/home/ec2-user/SageMaker/bert-large-uncased/')
sent_model = torch.load(ROOT_PATH + 'model/3_class_fed_testimony_bert_large_uncased_best_model.pt', map_location = device)  

In [48]:
tokenizer_finbert = BertTokenizer.from_pretrained('/home/ec2-user/SageMaker/finbert/')
finbert_model = BertForSequenceClassification.from_pretrained('/home/ec2-user/SageMaker/finbert/', num_labels=3)

In [49]:
cols = ['GOID', 'PubFrosting_Title', 'PubFrosting_SortTitle',
       'CurrentTitle_Title', 'CurrentTitle_SortTitle', 'MpubId',
       'Numeric Date', 'Title of Newspaper', 'Country', 'Text', 'sent_rep',
       'sent_dem']

In [50]:
df_rep = df[['GOID', 'PubFrosting_Title', 'PubFrosting_SortTitle',
       'CurrentTitle_Title', 'CurrentTitle_SortTitle', 'MpubId',
       'Numeric Date', 'Title of Newspaper', 'Country', 'sent_rep']].explode(['sent_rep'])\
       .reset_index(drop=True).rename(columns={"sent_rep": "Text"})\
    

In [51]:
df_dem = df[['GOID', 'PubFrosting_Title', 'PubFrosting_SortTitle',
       'CurrentTitle_Title', 'CurrentTitle_SortTitle', 'MpubId',
       'Numeric Date', 'Title of Newspaper', 'Country', 'sent_dem']].explode(['sent_dem'])\
       .reset_index(drop=True).rename(columns={"sent_dem": "Text"})

In [52]:
def processing_file(df):
    
    df.dropna(subset=['Text'], inplace=True)
    
    #finbert
    df['finbert_score'] = df.Text.apply(get_finbert_sentiment)

    df[['finbert_neutral','finbert_positive','finbert_negative']] = pd.DataFrame(df['finbert_score'].tolist(), index= df.index)

    df = df.drop(['finbert_score'], axis=1)

    #ahkz
    df['sentiment_value'] = 0

    testimony_data_loader = create_data_loader(df, tokenizer, MAX_LEN, BATCH_SIZE)
    testimony_texts, predictions, prediction_probs, real_values = get_predictions(sent_model, testimony_data_loader)

    pred_numpy = predictions.numpy()
    pred_probs = prediction_probs.numpy()
    ms_pred = real_values.numpy()

    df['sentiment_head'] = pred_numpy
    df['sentiment_head'].replace({2: -1}, inplace=True)
    df.drop('sentiment_value', axis=1, inplace=True)
    df = df.rename(columns={"sentiment_head": "ahkz_score"})

    #pysent3
    hiv4 = ps.HIV4()
    df['pysent3'] = df['Text'].apply(lambda x: hiv4.get_score(hiv4.tokenize(x)))

    pysent3_df = df['pysent3'].apply(pd.Series).rename(columns={"Positive": "pysent3_Positive", \
                                                   "Negative": "pysent3_Negative", \
                                                   "Polarity": "pysent3_Polarity", \
                                                   "Subjectivity": "pysent3_Subjectivity"})

    df = pd.concat([df, pysent3_df], axis=1)

    df = df.drop(columns=['pysent3'])
    
    df['spacy_senti'] = df['Text'].apply(lambda x: nlp_spacy(x).cats['positive'])
    
    return df


In [None]:
df_rep = processing_file(df_rep)

In [None]:
df_dem = processing_file(df_dem)

In [None]:
df_dem = df_dem.merge(df[['GOID', 'Title of Newspaper']], on='GOID', how='left')

In [56]:
df_rep = df_rep.merge(df[['GOID', 'Title of Newspaper']], on='GOID', how='left')

In [58]:
cols = ['GOID',
         'Title',
         'Numeric Date',
         'Title of Newspaper',
         'Text',
         'finbert_neutral',
         'finbert_positive',
         'finbert_negative',
         'ahkz_score',
         'pysent3_Positive',
         'pysent3_Negative',
         'pysent3_Polarity',
         'pysent3_Subjectivity',
         'spacy_senti']

In [60]:
df_dem = df_dem[cols]
df_rep = df_rep[cols]

In [None]:
df_rep.to_csv('/home/ec2-user/SageMaker/Getting Started/2022.05.25/ahkz_sentiment_classifier/ahkz_sentiment_classifier/party_result_2016/rep_sent.csv', index =False)

In [None]:
df_dem.to_csv('/home/ec2-user/SageMaker/Getting Started/2022.05.25/ahkz_sentiment_classifier/ahkz_sentiment_classifier/party_result_2016/dem_sent.csv', index =False)

In [40]:
1+1

2