In [None]:
!pip install trafilatura
!pip install openai

In [1]:
import json
import pandas as pd
import trafilatura
import spacy

## Data Exploration

In [5]:
spaziodati_path = r".\spaziodati_sample.jsonl"
webpages_df = pd.read_json(path_or_buf=spaziodati_path, lines=True)

In [None]:
print(f"Shape: {webpages_df.shape}")
print(f"Columns: {webpages_df.columns}")

In [None]:
webpages_df["status"].value_counts()

In [7]:
webpages_df = webpages_df.drop(["status"], axis="columns")

In [None]:
INDEX = 25
print(webpages_df.iloc[INDEX]["url"])
print(trafilatura.extract(webpages_df.iloc[INDEX]["content"]))
print(webpages_df.iloc[INDEX]["people"])

In [9]:
nlp = spacy.load("it_core_news_sm")

In [10]:
parsed_entry = nlp(trafilatura.extract(webpages_df.iloc[INDEX]["content"], favor_recall=True))

In [None]:
spacy.displacy.render(parsed_entry, style="ent", jupyter=True)

In [30]:
roles = set()
for entry in webpages_df.itertuples(index=False):
    for person in entry[1]:
        roles.add(person["role"])

In [None]:
print(f"Roles found: {len(roles)}")
roles

## Dataset Creation

In [12]:
spaziodati_path = r".\spaziodati_sample.jsonl"

In [13]:
webpages_df = pd.read_json(path_or_buf=spaziodati_path, lines=True)

#### Using BeautifulSoup
This approach extract every visible text on the screen whatsoever

In [None]:
import bs4
import re

In [None]:
full_text_pages = {}
for i, entry in webpages_df.iterrows():
    if i >= 100:
        break
    soup = bs4.BeautifulSoup(entry["content"], 'html.parser')
    page_text = soup.get_text().strip()
    cleaned_page_text = re.sub(r"^\n$", "", page_text) # remove newline only lines
    cleaned_page_text = re.sub(r"\n{2,}", "\n", cleaned_page_text) # remove multiple newlines
    cleaned_page_text = re.sub(r"\s{2,}", " ", cleaned_page_text) # remove multiple whitespaces
    cleaned_page_text = cleaned_page_text.replace("\n", " . ") # replace remaining newlines
    cleaned_page_text = re.sub(r"[^\w\s\d\.,:;’'@\(\)&]", " ", cleaned_page_text) # remove non conventional characters
    cleaned_page_text = re.sub(r"\d{5,}", "[NUM]", cleaned_page_text) # mask long numbers
    cleaned_page_text = re.sub(r"(?:\.\s+){2,}", ". ", cleaned_page_text) # remove repeating periods followed by spaces
    cleaned_page_text = re.sub(r"\b[A-Z]+(?:\s+[A-Z]+)*\b", lambda pattern: pattern.group(0).title(), cleaned_page_text) # titolize full caps words
    full_text_pages[entry["url"]] = cleaned_page_text

In [None]:
with open("beautiful_soup_pages.json", "w", encoding="utf-8") as f:
    json.dump(full_text_pages, f, ensure_ascii=False, indent=4)

#### Using Trafilatura [old approach]
First approach used to extract text. The library only extract part of the text, the one that it deems "important" using its heuristics

In [16]:
webpages_text = webpages_df["content"].loc[:100].apply(lambda text: trafilatura.extract(text, favor_recall=True))

In [None]:
test_df = webpages_text.str.replace("\n", " . ").\
    str.replace(r"[^\w\s\d\.,:;’'@\(\)&]", " ").\
    str.replace(r"\s{2,}", " ").\
    str.replace(r"\d{5,}", "[NUM]").\
    str.replace(r"(?:\.\s+){2,}", ". ").\
    str.strip(" .").\
    str.replace(r"\b[A-Z]+(?:\s+[A-Z]+)*\b", lambda pattern: pattern.group(0).title())

In [18]:
print(test_df[2])

Chi . Un team vincente . Drago Press è un’agenzia di comunicazione, marketing, web e digital strategy, fondata negli anni Novanta, con sede a Padova e uffici di rappresentanza a Londra, Milano, Bologna e Roma. In grado di operare su scala nazionale e internazionale, l’agenzia è composta da un team di professionisti dinamici e propositivi, capaci di assistere efficacemente strutture che operano nei settori: tourism, food&beverage, luxury, medical, wellness, hotellerie. Maurizio Drago . Ha iniziato la sua attività di giornalista da quando frequentava la facoltà di Scienze Politiche all’Università di Padova. Ha proseguito la sua professione come dirigente presso le associazioni economiche. Poi si è buttato a capofitto nel lavoro di giornalista nella ricerca e valorizzazione dei prodotti tipici e dei luoghi caratteristici della nostra bella Italia, coniando per primo il nome di giornalista enogastroturista , acquisendo conoscenze specifiche nel settore del turismo e dei prodotti, della ric

In [19]:
cleaned_base_df = {url: text for url, text in zip(webpages_df.url.values, test_df.values)}

In [21]:
with open("trafilatura_pages.json", "w", encoding="utf-8") as f:
    json.dump(cleaned_base_df, f, ensure_ascii=False, indent=4)
    

## ChatGPT

In [None]:
import openai
import tqdm

In [None]:
openai.api_key = "INSERT-KEY"

In [None]:
spaziodati_path = r".\beautiful_soup_pages.json"
with open(spaziodati_path, "r", encoding="utf-8") as f:
    webpages_text = json.load(f)

In [None]:
people_and_jobs_prompt_3_5 = """Estrai tutti i nomi propri delle persone dal seguente testo e le rispettive posizioni lavorative.
Il testo in questione è stato estratto da una pagina di contatti del sito web di un'azienda. Le informazioni da estrarre sono necessarie a creare una lista di persone che lavorano per l'azienda.
Se la posizione lavorativa di una persona non è chiara o non è riportata, scrivi solo "?".
La tua risposta deve essere formata unicamente dalla lista dei nomi delle persone, seguita da ":" e poi dalla posizione lavorativa. Ad esempio: "Mario Rossi: CEO" o "Luigi Verdi: ?".
Rispondi solo con la lista di nomi e ruoli lavorativi, non aggiungere commenti o altro.
Di seguito riporto un esempio per aiutarti nell'estrazione di informazioni dal testo reale.
TESTO:
Nel 2011 alla guida della Camera è eletto Antonio Barile. . Il nuovo Presidente con entusiasmo ha dichiarato di voler consolidare le posizioni acquisite dalla Camera di Commercio Italo Orientale in questi anni, favorendo in tutti i contesti locali, nazionali ed internazionali, le azioni che possano permettere di collaborare e concretizzare progetti di media e lunga durata sia con le istituzioni pubbliche che private. . Già dopo pochi mesi dalla sua elezione ha siglato un interessante protocollo d’intesa con la Camera di Commercio di Prahova e a seguire con Halal Italy e con la Samer di Bari. Vitandrea Marzano . Vice Presidente. Annarita Torino . Tesoriere . Barnaba Alessandra, Demarinis Domenico, Laforgia Mario. Consiglieri. Lattanzio Michelangelo . Revisore (Presidente) . Boleto Carmela, Panza Massimo . Revisori
OUTPUT:
- Antonio Barile: presidente
- Vitandrea Marzano: vice presidente
- Annarita Torino: tesoriere
- Alessandra Barnaba: consigliere
- Domenico Demarinis: consigliere
- Mario Laforgia: consigliere
- Michelangelo Lattanzio: revisore (presidente)
- Carmela Boleto: revisore
- Massimo Panza: revisore
TESTO:
{}
OUTPUT:"""

In [None]:
system_people_and_jobs_prompt_4 = """Sei un analizzatore testuale di contenuti di pagine web.
Estrai tutti i nomi propri delle persone dal testo che ti verrà fornito e le rispettive posizioni lavorative.
Il testo in questione è stato estratto da una pagina di contatti del sito web di un'azienda. Le informazioni da estrarre sono necessarie a creare una lista di persone che lavorano per l'azienda.
Se la posizione lavorativa di una persona non è chiara o non è riportata, scrivi solo "?".
La tua risposta deve essere formata unicamente dalla lista dei nomi delle persone, seguita da ":" e poi dalla posizione lavorativa. Ad esempio: "Mario Rossi: CEO" o "Luigi Verdi: ?".
Rispondi solo con la lista di nomi e ruoli lavorativi, non aggiungere commenti o altro.
Di seguito riporto un esempio per aiutarti nell'estrazione di informazioni dal testo reale.
TESTO:
Nel 2011 alla guida della Camera è eletto Antonio Barile. . Il nuovo Presidente con entusiasmo ha dichiarato di voler consolidare le posizioni acquisite dalla Camera di Commercio Italo Orientale in questi anni, favorendo in tutti i contesti locali, nazionali ed internazionali, le azioni che possano permettere di collaborare e concretizzare progetti di media e lunga durata sia con le istituzioni pubbliche che private. . Già dopo pochi mesi dalla sua elezione ha siglato un interessante protocollo d’intesa con la Camera di Commercio di Prahova e a seguire con Halal Italy e con la Samer di Bari. Vitandrea Marzano . Vice Presidente. Annarita Torino . Tesoriere . Barnaba Alessandra, Demarinis Domenico, Laforgia Mario. Consiglieri. Lattanzio Michelangelo . Revisore (Presidente) . Boleto Carmela, Panza Massimo . Revisori
OUTPUT:
- Antonio Barile: presidente
- Vitandrea Marzano: vice presidente
- Annarita Torino: tesoriere
- Alessandra Barnaba: consigliere
- Domenico Demarinis: consigliere
- Mario Laforgia: consigliere
- Michelangelo Lattanzio: revisore (presidente)
- Carmela Boleto: revisore
- Massimo Panza: revisore
"""

In [None]:
MODEL = "gpt-4"

def extract_names_and_roles(text: str) -> str:
    if MODEL == "gpt-3.5-turbo":
        response = openai.ChatCompletion.create(
            model=MODEL,
            messages=[
                {"role": "user", "content": people_and_jobs_prompt_3_5.format(text)}
            ]
        )
    elif MODEL == "gpt-4":
        response = openai.ChatCompletion.create(
            model=MODEL,
            messages=[
                {"role": "system", "content": system_people_and_jobs_prompt_4},
                {"role": "user", "content": text}
            ]
        )
    return response.choices[0]["message"]["content"]

Extract names and roles using the desired GPT model

In [None]:
raw_gpt_data = {}
for url, wp_text in tqdm.tqdm(webpages_text.items()):
    wp_text = wp_text.replace("\n", " . ")
    try:
        raw_data = extract_names_and_roles(wp_text)
        raw_gpt_data[url] = raw_data
    except Exception as e:
        print(f"Skipping long text ({e})")
        raw_gpt_data[url] = ""

Parse the extracted data in order to enforce the desired output structure, removing anything non compliant with it

In [None]:
parsed_gpt_data = []
for url in webpages_text:
    current_data = {"url": url, "text": webpages_text[url], "people": []}
    for row in raw_gpt_data[url].split("\n"):
        if ":" not in row or "-" not in row:
            continue
        try:
            name, role = row.split(":")
        except:
            print(row)
            continue
        parsed_name = name.strip("- ")
        if "?" in name:
            continue
        current_data["people"].append({"name": parsed_name, "role": role.strip()})
    parsed_gpt_data.append(current_data)

In [None]:
with open("gpt4_extracted_data.json", "w", encoding="utf-8") as f:
    json.dump(parsed_gpt_data, f, ensure_ascii=False, indent=4)

## Results aggregation and cleaning

In [8]:
gpt35_data_path = r".\gpt35_extracted_data.json"
gpt4_data_path = r"\gpt4_extracted_data.json"
gpt35_data = json.load(open(gpt35_data_path, "r", encoding="utf-8"))
gpt4_data = json.load(open(gpt4_data_path, "r", encoding="utf-8"))


Extract the first 100 entry of the dataset and get urls and people's names and roles

In [9]:
cleaned_spaziodati_people = []
for i in range(0, 100):
    curr_people = {"url": webpages_df.iloc[i].url, "people": []}
    for person in webpages_df.iloc[i]["people"]:
        curr_people["people"].append({
            "name": f"{person['name']} {person['surname']}",
            "role": person['role']
        })
    cleaned_spaziodati_people.append(curr_people)

Put the extracted data into the same data structure, sorting the individual model results for each page 

In [10]:
complete_data = []
for spaziodati, gpt35, gpt4 in zip(cleaned_spaziodati_people, gpt35_data, gpt4_data):
    if spaziodati["url"] != gpt35["url"]:
        print("Different URL, skipping...")
        continue
    current_data = {"text3.5": gpt35["text"], "text4": gpt4["text"], "people": {}}
    current_data["people"]["spaziodati"] = sorted(spaziodati["people"], key=lambda x: x["name"])
    current_data["people"]["gpt3.5"] = sorted(gpt35["people"], key=lambda x: x["name"])
    current_data["people"]["gpt4"] = sorted(gpt4["people"], key=lambda x: x["name"])
    complete_data.append(current_data)

#### Duplicate Removal
Remove each duplicate entry in each page. A duplicate entry is an entry which has the same exact name and role as another extracted person.

In [None]:
no_duplicates_data = []
duplicates = {model: 0 for model in complete_data[0]["people"]}
for entry in complete_data:
    current_general_entry = entry.copy()
    people = current_general_entry["people"]
    current_general_entry["people"] = {}
    for model, current_model_people in people.items():
        cleaned_model_entry = []
        for i, person in enumerate(current_model_people):
            if person in people[model][i+1:]:
                duplicates[model] += 1
                continue
            cleaned_model_entry.append(person)
        current_general_entry["people"][model] = cleaned_model_entry
    no_duplicates_data.append(current_general_entry)
            

In [44]:
complete_data = no_duplicates_data
print(duplicates)

{'base': 266, 'gpt3.5_new': 15, 'gpt4_op': 2, 'gpt4_full': 2}


Standardize "null" role from SpazioDati to "?"

In [None]:
for entry in complete_data:
    for base_person in entry["people"]["spaziodati"]:
        if base_person["spaziodati"]:
            continue
        base_person["role"] = "?"

Aggregate partial duplicates. A partial duplicate is an entry which shares the same name as another one in the same page, but having different roles. In this case, the two entries are merged, maintaining the name and joining the two roles.

In [None]:
no_partial_duplicates_data = []
partial_duplicates_counts = {model: 0 for model in complete_data[0]["people"]}
for entry in complete_data:
    current_entry = entry.copy()
    current_entry["people"] = {}
    for model_name, extracted_people in entry["people"].items():
        no_partial_duplicates_model_data = {}
        for i, person in enumerate(extracted_people):
            person_name = person["name"]
            if person_name not in no_partial_duplicates_model_data:
                no_partial_duplicates_model_data[person["name"]] = person["role"]
            else:
                partial_duplicates_counts[model_name] += 1
                no_partial_duplicates_model_data[person["name"]] += ", " + person["role"]
        no_partial_duplicates_model_data = [{"name": name, "role": role} for name, role in no_partial_duplicates_model_data.items()]
        current_entry["people"][model_name] = no_partial_duplicates_model_data
    no_partial_duplicates_data.append(current_entry)

In [None]:
print(partial_duplicates_counts)
complete_data = no_partial_duplicates_data

In [11]:
with open("comparison_data.json", "w", encoding="utf-8") as f:
    json.dump(complete_data, f, ensure_ascii=False, indent=4)

## Results Analysis

In [8]:
with open("comparison_data.json", "r", encoding="utf-8") as f:
    complete_data = json.load(f)

In [None]:
def count_people(complete_data):
    models_count = {model: 0 for model in complete_data[0]["people"]}
    for entry in complete_data:
        complete_extracted_people = entry["people"]
        for model_name, extracted_people in complete_extracted_people.items():
            models_count[model_name] += len(extracted_people)
    return models_count

def print_people_count(people_count):
    print("Number of people extracted from 100 pages:")
    for model, count in people_count.items():
        printable_model_name = model.replace("_", " ").title()
        print(f"{printable_model_name}: {count}") 

people_counts = count_people(complete_data)
print_people_count(people_counts)

### Exact Match
A match is considered *exact* when the two names or roles are exactly the same.

In [12]:
import string
import re

def parse_entity(entity: str) -> str:
    entity = re.sub(r"\be\b", "", entity.lower())
    return re.sub(f"[{re.escape(string.punctuation)}]", " ", entity)

def get_exact_name_and_role_matches(base_people, extracted_people) -> int:
    exact_matches = 0
    for extracted_person in extracted_people:
        name = parse_entity(extracted_person["name"])
        role = parse_entity(extracted_person["role"])
        exact_matches += any(
            [
                name == parse_entity(base_person["name"]) and role == parse_entity(base_person["role"]) 
                for base_person in base_people 
                if base_person["name"] and base_person["role"]
            ]
        )
    return exact_matches

def get_exact_name_matches(base_people, extracted_people) -> int:
    exact_matches = 0
    for extracted_person in extracted_people:
        name = parse_entity(extracted_person["name"])
        exact_matches += any(
            [
                name == parse_entity(base_person["name"])
                for base_person in base_people 
                if base_person["name"]
            ]
        )
    return exact_matches


In [None]:
def count_exact_matches(complete_data, only_name_match:bool=False):
    models_count = {model: 0 for model in complete_data[0]["people"] if model != "spaziodati"}
    for entry in complete_data:
        complete_extracted_people = entry["people"]
        base_people = complete_extracted_people["spaziodati"]
        for model_name, extracted_people in complete_extracted_people.items():
            if model_name == "spaziodati":
                continue
            if only_name_match:
                models_count[model_name] += get_exact_name_matches(base_people, extracted_people)
            else:
                models_count[model_name] += get_exact_name_and_role_matches(base_people, extracted_people)
    return models_count

def print_exact_matches(base_num_people: int, exact_full_matches, exact_name_matches):
    print("Matches with base data")
    print("Full matches:")
    for model, count in exact_full_matches.items():
        printable_model_name = model.replace("_", " ").title()
        print(f"\t{printable_model_name}: {count}/{base_num_people} ({count*100/base_num_people:.2f}%)") 
    print("Name matches:")
    for model, count in exact_name_matches.items():
        printable_model_name = model.replace("_", " ").title()
        print(f"\t{printable_model_name}: {count}/{base_num_people} ({count*100/base_num_people:.2f}%)") 

exact_full_matches = count_exact_matches(complete_data)
exact_name_matches = count_exact_matches(complete_data, only_name_match=True)
print_exact_matches(people_counts["spaziodati"], exact_full_matches, exact_name_matches)

### Partial Match
A match is considered *partial* when at least one of the token/words in the name or role is present in both entries. This includes a lot of false positives, but makes it possible to find same roles written in slightly different ways or same names written in different ways (surname first or name first). 

Examples:
- "Mario Rossi" and "Rossi Mario"
- "Call center operator" and "Operator in a call center"

In [14]:
def get_partial_tokenized_name_and_role_matches(base_people, extracted_people) -> int:
    tokenized_partial_matches = 0
    for person in extracted_people:
        if not person["role"]:
            continue
        tokenized_name = parse_entity(person["name"]).split()
        tokenized_role = parse_entity(person["role"]).split()
        for base_person in base_people:
            if not (base_person["name"] and base_person["role"]):
                continue
            partial_name_match = any([1 for token in tokenized_name if token in parse_entity(base_person["name"])])
            partial_role_match = any([1 for token in tokenized_role if token in parse_entity(base_person["role"])])
            tokenized_partial_matches += int(partial_name_match and partial_role_match) 
            if partial_name_match and partial_role_match:
                break
    return tokenized_partial_matches


def get_partial_tokenized_name_matches(base_people, extracted_people) -> int:
    tokenized_partial_matches = 0
    for person in extracted_people:
        tokenized_name = parse_entity(person["name"]).split()
        for base_person in base_people:
            if not base_person["name"]:
                continue
            partial_name_match = any([1 for token in tokenized_name if token in parse_entity(base_person["name"])])
            tokenized_partial_matches += int(partial_name_match)
            if partial_name_match:
                break
    return tokenized_partial_matches

In [None]:
def count_partial_matches(complete_data, only_name_match:bool=False):
    models_count = {model: 0 for model in complete_data[0]["people"] if model != "spaziodati"}
    for entry in complete_data:
        complete_extracted_people = entry["people"]
        base_people = complete_extracted_people["spaziodati"]
        for model_name, extracted_people in complete_extracted_people.items():
            if model_name == "spaziodati":
                continue
            if only_name_match:
                models_count[model_name] += get_partial_tokenized_name_matches(base_people, extracted_people)
            else:
                models_count[model_name] += get_partial_tokenized_name_and_role_matches(base_people, extracted_people)
    return models_count

def print_partial_matches(base_num_people: int, partial_full_matches, partial_name_matches):
    print("Partial (tokenized) matches with base data")
    print("Full matches:")
    for model, count in partial_full_matches.items():
        printable_model_name = model.replace("_", " ").title()
        print(f"\t{printable_model_name}: {count}/{base_num_people} ({count*100/base_num_people:.2f}%)") 
    print("Name matches:")
    for model, count in partial_name_matches.items():
        printable_model_name = model.replace("_", " ").title()
        print(f"\t{printable_model_name}: {count}/{base_num_people} ({count*100/base_num_people:.2f}%)") 

partial_full_matches = count_partial_matches(complete_data)
partial_name_matches = count_partial_matches(complete_data, only_name_match=True)
print_partial_matches(people_counts["spaziodati"], partial_full_matches, partial_name_matches)

### Missing extraction stats

In [16]:
def count_empty_people(all_people, type_: str) -> int:
    return sum([1 for entry in all_people if not entry["people"][type_]])

def count_jobs_not_found(all_people, type_: str) -> int:
    no_jobs = 0
    for entry in all_people:
        for person in entry["people"][type_]:
            if not person["role"] or person["role"] == "?":
                no_jobs += 1
    return no_jobs

In [None]:
print("No people extracted")
print(f"\tSpazioDati: {count_empty_people(complete_data, 'spaziodati')}")
print(f"\tChatGPT 3.5: {count_empty_people(complete_data, 'gpt3.5')}")
print(f"\tChatGPT 4: {count_empty_people(complete_data, 'gpt4')}")
print("=========")
print("Job not extracted")
print(f"\tSpazioDati: {count_jobs_not_found(complete_data, 'spaziodati')}")
print(f"\tChatGPT 3.5: {count_jobs_not_found(complete_data, 'gpt3.5')}")
print(f"\tChatGPT 4: {count_jobs_not_found(complete_data, 'gpt4')}")

### Different number of extractions stats

In [13]:
spaziodati_more_than_gpt35 = 0 
gpt35_more_than_spaziodati = 0 
spaziodati_more_than_gpt4 = 0
gpt4_more_than_spaziodati = 0
for entry in complete_data:
    if len(entry["people"]["spaziodati"]) > len(entry["people"]["gpt4"]):
        spaziodati_more_than_gpt4 += 1
    if len(entry["people"]["spaziodati"]) < len(entry["people"]["gpt4"]):
        gpt4_more_than_spaziodati += 1
    if len(entry["people"]["spaziodati"]) > len(entry["people"]["gpt3.5"]):
        spaziodati_more_than_gpt35 = 0 
    if len(entry["people"]["spaziodati"]) < len(entry["people"]["gpt3.5"]):
        gpt35_more_than_spaziodati = 0 

In [None]:
print(f"Entries where GPT4 has extracted less people than SpazioDati: {spaziodati_more_than_gpt4}")
print(f"Entries where GPT4 has extracted more people than SpazioDati: {gpt4_more_than_spaziodati}")
print(f"Entries where GPT3.5 has extracted less people than SpazioDati: {spaziodati_more_than_gpt35}")
print(f"Entries where GPT3.5 has extracted more people than SpazioDati: {gpt35_more_than_spaziodati}")