In [1]:
!pip install Wikidata
!pip install datasets

Collecting Wikidata
  Downloading Wikidata-0.8.1-py3-none-any.whl.metadata (3.0 kB)
Downloading Wikidata-0.8.1-py3-none-any.whl (29 kB)
Installing collected packages: Wikidata
Successfully installed Wikidata-0.8.1
Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[?25hDown

In [2]:
import torch, random, requests, os, pickle
import numpy as np
from wikidata.client import Client
from datasets import load_dataset
from itertools import islice
from google.colab import userdata

In [3]:
from transformers import set_seed

In [4]:
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

In [5]:
set_seed(42)

In [6]:
def dump(file_name, result):
    # remove dump files if present
    if os.path.exists(file_name):
        os.remove(file_name)
    with open(file_name, 'wb') as file:
        print("dumping", file_name)
        # noinspection PyTypeChecker
        pickle.dump(result, file)

def load(file_name):
    with open(file_name, 'rb') as file:
        print("loading", file_name)
        # noinspection PyTypeChecker
        return pickle.load(file)

In [7]:
def wikipedia_pages(sitelinks):
    result = []
    for site_key in sitelinks.keys():
        if site_key.endswith("wiki") and not site_key.startswith("commons"):
            lang = site_key.replace("wiki", "")
            result.append(lang)
    return result

def build_claims(claims):
    result = {}
    for prop_id, values in claims.items():
        result[prop_id] = len(values)
    return result

class Entity:
    def __init__(self, entity_id, dataset_item, wiki_data, wiki_text):
        self.entity_id = entity_id
        self.label = dataset_item['label']
        self.name = dataset_item['name']
        self.description = dataset_item['description']
        self.type = dataset_item['type']
        self.category = dataset_item['category']
        self.subcategory = dataset_item['subcategory']
        self.wiki_text = wiki_text
        # Languages
        self.labels = list(wiki_data.data.get("labels", {}).keys())
        self.descriptions = list(wiki_data.data.get("descriptions", {}).keys())
        self.aliases = list(wiki_data.data.get("aliases", {}).keys())
        self.wikipedia_pages = wikipedia_pages(wiki_data.data.get("sitelinks", {}))
        # Properties
        self.claims = build_claims(wiki_data.data.get("claims", {}))

    def __str__(self):
        return self.entity_id + ": " + self.label + " - " + self.name

API_URL = "https://en.wikipedia.org/w/api.php"

def extract_entity_id(url):
    return url.strip().split("/")[-1]

def get_wiki_text(en_wiki):
    if not en_wiki:
        return None
    title = en_wiki["title"]
    params = {
        "action": "query",
        "prop": "extracts",
        "explaintext": True,
        "titles": title,
        "format": "json",
        "redirects": 1
    }
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'}
    res = requests.get(API_URL, params=params, headers=headers)
    json = res.json()
    page = next(iter(json["query"]["pages"].values()))
    # Keep the original text as it is.
    # The text will be processed in other methods,
    # such as processed_dataset#tokenize().
    return page.get("extract", "")

class EntityFactory:
    def __init__(self, client):
        self.client = client

    def create(self, item):
        entity_id = extract_entity_id(item['item'])
        try:
            wikidata = self.client.get(entity_id, load=True)
            sitelinks = wikidata.data.get("sitelinks", {})
            en_wiki = sitelinks.get("enwiki")
            return Entity(entity_id, item, wikidata, get_wiki_text(en_wiki))
        except Exception as e:
            print("Error loading id:", entity_id, e)
            return None

TRAINING_FILE_NAME = "training.bin"
VALIDATION_FILE_NAME = "validation.bin"

def create_set(dataset, factory, limit, file_name):
    # apply the limits
    if limit is None:
        limit = len(dataset)
    result = []
    for index, item in enumerate(islice(dataset, limit)):
        created = factory.create(item)
        if created is not None:
            result.append(created)
        if (index + 1) % 10 == 0:
            print("creating", file_name, index + 1, "/", limit)
    return result

class NLPDataset:
    def __init__(self, training_limit=None, validation_limit=None, force_reload=False):
        if not (os.path.exists(TRAINING_FILE_NAME)) or not (os.path.exists(VALIDATION_FILE_NAME)) or force_reload:
            # load the project dataset
            dataset = load_dataset('sapienzanlp/nlp2025_hw1_cultural_dataset', token=userdata.get('HUGGINGFACE_TOKEN'))
            # a factory object is used to create our entities
            factory = EntityFactory(Client())

            self.training_set = create_set(dataset['train'], factory, training_limit, TRAINING_FILE_NAME)
            self.validation_set = create_set(dataset['validation'], factory, validation_limit, VALIDATION_FILE_NAME)
            dump(TRAINING_FILE_NAME, self.training_set)
            dump(VALIDATION_FILE_NAME, self.validation_set)
        else:
            # by default load the dataset from a local dump
            self.training_set = load(TRAINING_FILE_NAME)
            self.validation_set = load(VALIDATION_FILE_NAME)

    def __str__(self):
        return "training: " + str(len(self.training_set)) + ". validation: " + str(len(self.validation_set))

In [8]:
nlp_dataset = NLPDataset()

loading training.bin
loading validation.bin


In [9]:
import csv
from transformers import AutoModelForSequenceClassification, AutoTokenizer

In [13]:
def build_entity_dict():
    entity_dict = {}
    for entity in nlp_dataset.training_set:
        entity_dict[entity.entity_id] = entity
    for entity in nlp_dataset.validation_set:
        entity_dict[entity.entity_id] = entity
    return entity_dict

def label_to_number(label):
    if label == 'cultural agnostic':
        return 0
    if label == 'cultural representative':
        return 1
    if label == 'cultural exclusive':
        return 2
    raise ValueError('label not suppoerted: ' + label)

class WikiDataset:
    def __init__(self):
        entity_dict = build_entity_dict()
        dataset = load_dataset('sapienzanlp/nlp2025_hw1_cultural_dataset', token=userdata.get('HUGGINGFACE_TOKEN'))
        # enriching the entities with the wiki pages
        def map_labels(sample):
            label = sample["label"]
            sample["label"] = label_to_number(label)
            wiki_id = extract_entity_id(sample["item"])
            if wiki_id is not None and wiki_id in entity_dict:
                wiki_text = entity_dict[wiki_id].wiki_text
                sample["wiki_text"] = wiki_text if type(wiki_text) == str else ""
            else:
                sample["wiki_text"] = ""
            return sample
        self.dataset = dataset.map(map_labels)

    def tokenize(self, tokenizer):
        def tokenize_function(items):
            return tokenizer(items["description"], items["wiki_text"], padding=True, truncation=True)
        return self.dataset.map(tokenize_function, batched=True)

In [21]:
class InferenceModel:
    def __init__(self, repo, kind):
        self.model = AutoModelForSequenceClassification.from_pretrained(repo)
        self.tokenizer = AutoTokenizer.from_pretrained(kind)
        self.device = "cpu" if not torch.cuda.is_available() else "cuda"
        self.model.to(self.device)

    def predict_text(self, desc, wiki, input_ids_ds, attention_mask_ds):
        self.model.eval()
        # no max length - we want to use the default of the base model
        # as we do in training
        encoding = self.tokenizer(desc, wiki, return_tensors='pt', padding='max_length', truncation=True)
        input_ids = encoding['input_ids'].to(self.device)
        attention_mask = encoding['attention_mask'].to(self.device)
        input_ids_ds = torch.tensor(input_ids_ds).to(self.device).view(1, -1)
        attention_mask_ds = torch.tensor(attention_mask_ds).to(self.device).view(1, -1)
        with torch.no_grad():
            outputs = self.model(input_ids=input_ids_ds, attention_mask=attention_mask_ds)
            _, prediction = torch.max(outputs.logits, dim=1)
            outputs2 = self.model(input_ids=input_ids_ds, attention_mask=attention_mask_ds)
            _, prediction_ds = torch.max(outputs2.logits, dim=1)
        return prediction.item(), prediction_ds.item()

In [22]:
def number_to_label(label):
    if label == 0:
        return 'cultural agnostic'
    if label == 1:
        return 'cultural representative'
    if label == 2:
        return 'cultural exclusive'
    raise ValueError('label not suppoerted: ' + label)

dataset = WikiDataset()
model = InferenceModel("fax4ever/culturalitems-roberta-base-5", "roberta-base")

tokenized_datasets = dataset.tokenize(model.tokenizer)
print(tokenized_datasets)
validation_ = tokenized_datasets["validation"]

matching = 0
matching_ds = 0
size = len(validation_)
with open('transformer-inference.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    field = ["item", "true label", "prediction", "prediction-ds", "correct", "correct-ds"]
    writer.writerow(field)
    for index, item in enumerate(validation_):
        p, p_ds = model.predict_text(item["description"], item["wiki_text"], item["input_ids"], item["attention_mask"])
        true_label = item["label"]
        match = p == true_label
        if match:
            matching = matching + 1
        match_ds = p_ds == true_label
        if match_ds:
            matching_ds = matching_ds + 1
        writer.writerow([item["item"], number_to_label(true_label), number_to_label(p), number_to_label(p_ds), match, match_ds])
        if (index + 1) % 10 == 0:
            print('inference: ', index + 1, "/", size)
            print('matched', matching, 'on', index + 1, '(', matching / (index + 1), ')')
            print('matched', matching_ds, 'on', index + 1, '(', matching_ds / (index + 1), ')')
print('inference: completed')
print('matched', matching, 'on', size, '(', matching / size, ')')
print('matched', matching_ds, 'on', size, '(', matching_ds / size, ')')

DatasetDict({
    train: Dataset({
        features: ['item', 'name', 'description', 'type', 'category', 'subcategory', 'label', 'wiki_text', 'input_ids', 'attention_mask'],
        num_rows: 6251
    })
    validation: Dataset({
        features: ['item', 'name', 'description', 'type', 'category', 'subcategory', 'label', 'wiki_text', 'input_ids', 'attention_mask'],
        num_rows: 300
    })
})
inference:  10 / 300
matched 8 on 10 ( 0.8 )
matched 8 on 10 ( 0.8 )
inference:  20 / 300
matched 16 on 20 ( 0.8 )
matched 16 on 20 ( 0.8 )
inference:  30 / 300
matched 23 on 30 ( 0.7666666666666667 )
matched 23 on 30 ( 0.7666666666666667 )
inference:  40 / 300
matched 31 on 40 ( 0.775 )
matched 31 on 40 ( 0.775 )
inference:  50 / 300
matched 40 on 50 ( 0.8 )
matched 40 on 50 ( 0.8 )
inference:  60 / 300
matched 48 on 60 ( 0.8 )
matched 48 on 60 ( 0.8 )
inference:  70 / 300
matched 57 on 70 ( 0.8142857142857143 )
matched 57 on 70 ( 0.8142857142857143 )
inference:  80 / 300
matched 66 on 80 ( 