In [None]:
import pandas as pd
import numpy as np
import random
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE

import nltk
from nltk.corpus import names, stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler, Dataset
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Load Media Data and Labeling

In [None]:
train_df = pd.read_excel('Media_Data.xlsx')

con_media = ['Fox','WSJ','Forbes','Breitbart']
pro_media = ['CNN','MSNBC','NPR','NYT','TIME','The Guardian','The Washington Post','PBS','Politico','Vox']

train_df['label'] = None

for i, media in enumerate(train_df['media']):
    if media in con_media:
        train_df['label'][i] = 1
    elif media in pro_media:
        train_df['label'][i] = 0

train_df = train_df.dropna(subset=['article'])

print(train_df['label'].value_counts())

1    4249
0    3855
Name: label, dtype: int64


# undersampling

In [None]:
filt_1 = train_df['label'] == 1
filt_0 = train_df['label'] == 0

minority_num = train_df['label'].value_counts().min()
random_samples = random.sample(list(range(4249)), k=minority_num)

train_df_1 = train_df[filt_1].iloc[random_samples]
train_df_0 = train_df[filt_0]

train_df = pd.concat([train_df_1, train_df_0])

print(train_df['label'].value_counts())

1    3855
0    3855
Name: label, dtype: int64


# preprocessing

In [None]:
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('names')
nltk.download('stopwords')

all_stopwords = stopwords.words('english')
all_names = set(names.words())

# Text Preprocessing
def letters_only(word):
    return word.encode().isalpha()

lemmatizer = WordNetLemmatizer()

def clean_text(doc):
    cleaned_doc = []
    for word in doc.split(' '): # split doc. by blank (' ')
        word = word.lower() # ABD -> abd
        if letters_only(word) and word not in all_names and len(word) > 2 and word not in all_stopwords: # remove number and punc. and name entity
            cleaned_doc.append(lemmatizer.lemmatize(word))

    return ' '.join(cleaned_doc)

cleaned_docs = [clean_text(doc) for doc in train_df['article']]

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package names to /root/nltk_data...
[nltk_data]   Unzipping corpora/names.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


# Define Dataset class

In [None]:
class PoliticalClassificationDataset(Dataset):
    def __init__(self, texts, labels, tokenizer):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        inputs = self.tokenizer.encode_plus(text, padding='max_length', truncation=True, max_length=256, return_tensors='pt')
        return inputs, label

In [None]:
texts = cleaned_docs
labels = train_df['label'].values

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
dataset = PoliticalClassificationDataset(texts, labels, tokenizer)

train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)
train_dataset = PoliticalClassificationDataset(train_texts, train_labels, tokenizer)
test_dataset = PoliticalClassificationDataset(test_texts, test_labels, tokenizer)

batch_size = 8
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

# Build model and train

In [None]:
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

epochs = 10

optimizer = AdamW(model.parameters(), lr=2e-5)
loss_fn = torch.nn.CrossEntropyLoss()
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0, num_training_steps = total_steps)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(epochs):
    model.train()
    total_loss = 0

    for batch in train_dataloader:
        inputs, labels = batch
        inputs = {key: value.squeeze(1).to(device) for key, value in inputs.items()}
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(**inputs)
        logits = outputs.logits
        loss = loss_fn(logits, labels)
        total_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) ##
        optimizer.step()
        scheduler.step()
        model.zero_grad()

    avg_loss = total_loss / len(train_dataloader)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

# Validation

In [None]:
model.eval()
total_correct = 0
total_samples = 0

for batch in test_dataloader:
  inputs, labels = batch
  inputs = {key: value.squeeze(1).to(device) for key, value in inputs.items()}
  labels = labels.to(device)

  with torch.no_grad():
      outputs = model(**inputs)
      logits = outputs.logits
      predicted_labels = torch.argmax(logits, dim=1)
      total_correct += (predicted_labels == labels).sum().item()
      total_samples += len(labels)

accuracy = total_correct / total_samples
print(f"Test Accuracy: {accuracy*100:.2f}%")

# Save model and Load model

In [None]:
PATH = 'model_name'

# save model
torch.save(model, PATH)

# load model
model = torch.load(PATH)

# Classify government official documents through BERT model

In [None]:
test_df = pd.read_excel('Government_Data')
test_docs = [clean_text(doc) for doc in test_df['article']]

In [None]:
def classify_text(model, unlabelled_texts):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    encoded_inputs = tokenizer(unlabelled_texts, truncation=True, padding=True, return_tensors='pt')
    input_ids = encoded_inputs.input_ids
    attention_mask = encoded_inputs.attention_mask

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    input_ids = input_ids.to(device)
    attention_mask = attention_mask.to(device)

    model.to(device)
    model.eval()

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        predicted_labels = torch.argmax(logits, dim=1).tolist()

    return predicted_labels

In [None]:
Classification_Result = classify_text(model, test_docs)

# Result of classifying Government offical document
print(Classification_Result)