<h2> Introduction </h2>
This Jupyter Notebook last updated by Shrivats Agrawal (shriv9@seas.upenn.edu) builds on work done by Ssuying Chen (cssuying@umich.edu) and Emily Oxford (eoxford@umich.edu) to train a BERT model.

<h5> GDELT: </h5>
The GDELT Project (Global Database of Events, Language, and Tone) is the largest, most comprehensive, and highest resolution open database of human society as of now. It connects the world's people, locations, organizations, themes, counts, images, and emotions into a single holistic network over the globe. There are various analysis tools under the GDELT Analysis Service like Event Geographic Network, GKG Heatmap, GKG Word Cloud, and GKG Thematic Timeline.

<h2> Aim: </h2>
The goal of this script is to provide an effective solution to tag datasets fetched from the AWS GDelt database with the corresponding sectors. In that pursuit a Bidirectional Encoder Representations from Transformers (BERT) model is used for predictions.
 

## Importing Libraries

In [2]:
%%capture
!pip install --upgrade pip
!pip install tensorflow
!pip install torch
!pip install transformers
# !conda install -yc conda-forge pyarrow



In [3]:

import pyarrow
import tensorflow as tf
import pandas as pd
import torch
from transformers import BertTokenizer
from torch.utils.data import TensorDataset
from transformers import BertForSequenceClassification
import boto3
import io
import re
from datetime import datetime
import numpy as np
from ipywidgets import IntProgress
from IPython.display import display
import time
from tqdm import tqdm
from tqdm.notebook import trange, tqdm
from IPython.display import clear_output
import gc


## User Inputs

In [10]:
#----For Reading -----
parquet_folder_location = "ATHENA_BERT_RESULTS/2022/JAN22"
bucket = 'sector-classification'
#---For Saving------
bucket_save = 'sector-classification'
file_location = "Bert_Results"
year = "2022"
month = "JAN"
op_file_format = "parquet"

#Already Processed Files
processed_files_csv = "ProcessedFiles.csv"

def re_define_folder_location(new_month, new_year):
    global month
    global year
    global parquet_folder_location
    month = new_month
    year = new_year
    parquet_folder_location = f"ATHENA_BERT_RESULTS/{year}/{month}{year[2:]}"
    


## Functions

In [5]:


# Function to read parquet files and return a dataframe
def read_parquet_file(bucket, file_key):
    print(f"Reading {file_key}")
    engine = 'pyarrow'
    now1 = datetime.now()
    s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=file_key)
    file = io.BytesIO(obj['Body'].read() )
    now2=datetime.now()
    print("Time for reading file:",now2-now1)
    now1 = datetime.now()
    print("Trying to read parquet file..")
    df = pd.read_parquet(file)#, engine=engine, use_nullable_dtypes=True)
    print("parquet file read:")
    now2 = datetime.now()
    print(engine, ":", now2-now1)
    return df

#Return a list of all parquet files in the folder
def return_all_files(bucket, parquet_folder_location):
    s3_client = boto3.Session()
    s3 = s3_client.resource('s3')
    my_bucket = s3.Bucket(bucket)
    files_list = []
    for file in my_bucket.objects.filter(Prefix= parquet_folder_location):
        file_dict = {"bucket":file.bucket_name, "file_key":file.key}
        files_list.append(file_dict)
    return files_list

#Labels Dict
def get_labels_dict():
    bucket = "sector-classification"
    file_key = "Data/sasb_full_training.xlsx"


    s3_client = boto3.client('s3')

    obj = s3_client.get_object(Bucket=bucket, Key=file_key)

    sasb_data = pd.read_excel(io.BytesIO(obj['Body'].read()))

    # sasb_data = pd.read_csv("Data/training_data/sasb_full_training.csv")
    sasb_data.drop(["Unnamed: 0"], axis=1, inplace=True)
    labels = sasb_data.Sector.unique()

    label_dict = {}
    for index, label in enumerate(sorted(labels)):
        label_dict[index] = label
    return label_dict
label_dict = get_labels_dict()

#Function to clean and generate column for prediction
def obtain_text(row):
    replace_points=['.',"/","\n","https","http",":","www","  "]
    source=str(row['sourcecommonname'])
    doc_identifier=str(row['documentidentifier'])
    themes=str(row['themes'])
    for replace_symbol in replace_points:
        source=source.replace(replace_symbol," ")
        doc_identifier=doc_identifier.replace(replace_symbol," ")

    try:
        themes_text= " ".join(re.findall(".*?theme=(.*?),",themes))
    except Exception as e:
        themes_text = ""
  

    pred_text=" ".join([source,doc_identifier,themes_text])
    pred_text=pred_text.replace("_"," ")

    text_list=pred_text.split(" ")
    text_list_unique=list(dict.fromkeys(text_list))
    pred_text= " ".join(text_list_unique[:200])
  
    return pred_text[:1000]

#Assigning text labels to categorical numbers
def return_labels(label_number):
    global label_dict
    try:
        return label_dict[label_number]
    except Exception as e:
    #print(e)
        return ""


## Loading Pre_Trained Model

In [6]:

import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#model.to(device)
print(device)

model = BertForSequenceClassification.from_pretrained("bert-base-uncased",
                                                      num_labels=len(label_dict),
                                                      output_attentions=False,
                                                      output_hidden_states=False)

model.to(device)
model.load_state_dict(torch.load('Bert_Models/second_finetuned_BERT_epoch_5.model', map_location=torch.device('cpu')))

cuda


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at

<All keys matched successfully>

In [7]:

def make_predictions(df_batch, batch_size):
#     print(batch_size)
    max_count = df_batch.shape[0]

#     f = IntProgress(min=0, max=max_count) # instantiate the bar
#     display(f) # display the bar



    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', 
                                              do_lower_case=True)
    predictions_list=[]  
#     print("Total batches:", (max_count//batch_size) + 1)
    for i in tqdm(range(0,max_count,batch_size)):
#       f.value=i
        test_data = list(df_batch[i : i + batch_size]['pred_text'])
        predicted = [-1 for x in range(len(test_data))]
        # sys.exit()
        try:
            with torch.cuda.amp.autocast():
                encoded_data_test = tokenizer.batch_encode_plus(
                    #moderna_test, 
                    test_data[:batch_size],
                    add_special_tokens=True, 
                    return_attention_mask=True, 
                    padding=True, 
                    max_length=384,
                    return_tensors='pt'
                )
                #print(encoded_data_test)
            #print(encoded_data_test.shape)
            input_ids_moderna = encoded_data_test['input_ids']
            #print(input_ids_moderna.shape)
            attention_masks_moderna = encoded_data_test['attention_mask']

            output_moderna = model(input_ids_moderna.to(device))
            _, predicted = torch.max(output_moderna[0], 1)
            predicted = predicted.tolist()
        except Exception as e:
            print(e)

        predictions_list.extend(predicted)

        if(i%(50*batch_size)==0):
            torch.cuda.empty_cache()
#             print(i,"|",len(predicted),"| ",len(predictions_list))
        
    return predictions_list


#Saving df to S3
def save_df_to_s3(df, bucket_save, file_location, year, month, counter, file_format):
    if file_format == "csv":
        file_name = f"{month}{year[-2:]}_{counter}.csv"
        file_key = file_location + "/" + year + "/" + month + "/" + file_name
        with io.StringIO() as csv_buffer:
            s3_client = boto3.client("s3")
            df.to_csv(csv_buffer, index=False)

            response = s3_client.put_object(
                Bucket=bucket_save, Key=file_key, Body=csv_buffer.getvalue()
            )

            status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

            if status == 200:
                print(f"\nCSV | Successful S3 put_object response. Status - {status}\n")
            else:
                print(f"\nCSV | Unsuccessful S3 put_object response. Status - {status}\n")
                
    elif file_format == "parquet":
        file_name = f"{month}{year[-2:]}_{counter}.parquet"
        file_key = file_location + "/" + year + "/" + month + "/" + file_name
        with io.BytesIO() as parquet_buffer:
            s3_client = boto3.client("s3")
            df.to_parquet(parquet_buffer, index=False)

            response = s3_client.put_object(
                Bucket=bucket_save, Key=file_key, Body=parquet_buffer.getvalue()
            )

            status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

            if status == 200:
                print(f"\nParquet | Successful S3 put_object response. Status - {status}\n")
            else:
                print(f"\nParquet | Unsuccessful S3 put_object response. Status - {status}\n")
    else:
        raise Exception("Incorrect file format specified:"+file_format)
        
        
#function to add file to csv of processed files
def add_file_to_processed_list(file_dict):
    file_list = [file_dict]
    
    df_completed = pd.DataFrame(file_list)
    try:
        df_prev_completed = pd.read_csv(processed_files_csv)
        print("Files processed previously:",df_prev_completed.shape[0])
        df_completed = pd.concat([df_prev_completed, df_completed])[list(file_dict.keys())]
        df_completed.drop_duplicates(inplace = True)
        df_completed.reset_index(drop=True, inplace = True)
       
        df_completed.to_csv(processed_files_csv)
        print(f"File added! | {file_list}\nTotal files processed:",df_completed.shape[0])
        
    except:
        print(csv_file_name,"doesn't exist. Creating csv!")
        df_completed.to_csv(processed_files_csv)
        

#Function to check whether a file has already been processed or not
def check_file_processed(file_dict):
    
    file_name = file_dict['file_key']
    try:
        df_completed = pd.read_csv(processed_files_csv)
    except Exception as e:
        print(e)
        return False
    if file_name in list(df_completed['file_key']):
        print("File already processed. Skipping!")
        return True
    else:
        return False

# Main

In [11]:
files_list = return_all_files(bucket, parquet_folder_location)
# files_list


In [None]:
batch_size = 48
months = [month]
for month in months:
    clear_output()
#     #----For Reading -----
#     parquet_folder_location = f"ATHENA_BERT_RESULTS/2021/{month}21"
#     bucket = 'sector-classification'
#     files_list = return_all_files(bucket, parquet_folder_location)
    
    for j in range(len(files_list)):
    #     clear_output()


        for i in range(2):
            print("_______________________________________________________________________________")
        file_key = files_list[j]['file_key']
        bucket = files_list[j]['bucket']
        print(f"File: {j} | {file_key}")
        if check_file_processed(files_list[j]):
            continue


        #Memory clear step
        try:
            del df_batch
        except Exception as e:
            print(e)

        for _ in range(3):
            gc.collect()
            time.sleep(1)
            torch.cuda.empty_cache()


        df_batch = read_parquet_file(bucket, file_key)
        #TEMP
    #     df_batch = df_batch[0:500]
        print("df Shape:",df_batch.shape)
        print("-> Pre-proccesing file")
        now1 = datetime.now()
        df_batch['pred_text']=df_batch.apply(obtain_text,axis=1)
        now2 = datetime.now()
        print("Preprocessing time:",now2-now1)

        print("-> Ready for prediction!")
        #Making predictions
        predictions_list = make_predictions(df_batch, batch_size)
        print("-> Prediction Complete. \n->Preparing to save file.")
        now3 = datetime.now()
        df_batch['SASB_Tag'] = predictions_list
        df_batch['Predicted_Sector'] = df_batch['SASB_Tag'].apply(return_labels)
        df_batch = df_batch[df_batch['SASB_Tag']!=-1].reset_index(drop=True)
        save_df_to_s3(df_batch, bucket_save, file_location, year, month, j,op_file_format)
        now4 = datetime.now()
        print("->File saved! Time for saving file:",now4 - now3)

        #Adding file to processed list
        add_file_to_processed_list(files_list[j])
        #function to save the dataframe at the given location
    #     break

        time.sleep(5)
    #     if j==2:
    #         break
# df_batch

_______________________________________________________________________________
_______________________________________________________________________________
File: 0 | ATHENA_BERT_RESULTS/2022/JAN22/20220320_004518_00027_fd2d8_085f08be-3c1b-493d-a9bb-ee5de38b21ab
name 'df_batch' is not defined
Reading ATHENA_BERT_RESULTS/2022/JAN22/20220320_004518_00027_fd2d8_085f08be-3c1b-493d-a9bb-ee5de38b21ab
Time for reading file: 0:00:01.302703
Trying to read parquet file..
parquet file read:
pyarrow : 0:00:24.328337
df Shape: (192278, 16)
-> Pre-proccesing file
Preprocessing time: 0:06:18.027784
-> Ready for prediction!


HBox(children=(FloatProgress(value=0.0, max=4006.0), HTML(value='')))

  "`max_length` is ignored when `padding`=`True` and there is no truncation strategy. "
