# Open Alex Extraction and Matching with .search()

The goal of this Notebook is look up the PhD students (Authors) contained in the [cleaned](clean_data.ipynb) NARCIS dataset, and
1. Confirm they can be found in OpenAlex
2. Confirm their affiliation in NARCIS matches the one in OpenAlex
2. Confirm they wrote the associated PhD Thesis
3. Per author, look up all the contributors (i.e. potential first supervisors) that are listen in the NARCIS dataset and
    a. Find all authors that have worked for the same organization at the time the PhD thesis was published (within a 1 year window)
    b. xxx


The previous version of this notebook written by a Bachelor student was using the `.search_filter()` method of `pyalex`, which does not search alternate spellings of the specified name. In this notebook we are using `search_filter()`, which does not have that problem. See the example code [here](search_parameter_vs_search_filter.ipynb).

## 1. Setup

In [None]:
from pyalex import Works, Authors, Sources, Institutions, Topics, Publishers, Funders, Concepts
import pyalex # seems to be the only way to call `pyalex.config.email = email_address`
import pandas as pd
from os import path

from src.unabbreviate_institutions import unabbreviate_institutions

In [None]:
# Number of rows to read of the full dataset.
NROWS = 25 # None for all

Notebook settings

In [None]:
# Automatically reloads any modules that are imported, 
# so that any changes made to the module files are reflected # without needing to restart the Jupyter kernel.
# load autoreload module
%load_ext autoreload
# mode 1 reloads only when an import statement is called. For production
# mode 2 reloads before execution of every cell
%autoreload 2

# limit the number of rows that are shown with printing dataframes
pd.set_option('display.max_rows', 5)

Set contact email adress to get to the [polite pool](https://docs.openalex.org/how-to-use-the-api/rate-limits-and-authentication#the-polite-pool). If you are having a premium plan, you can access it via your email address as well.

In [None]:
# Get contact email adress from file
email_file_path = 'contact_email.txt'

if path.isfile(email_file_path):
    with open(email_file_path, 'r') as file:
        email_address = file.read().strip()

    # Assign the email address to the pyalex configuration
    pyalex.config.email = email_address

pyalex.config.email

## 2. Load and pre-process cleaned dataset

In [None]:
pubs_df = pd.read_csv('data/cleaned/pubs.csv')

# Take a sample
if NROWS == None:
    n_sample = len(pubs_df)
else:
    n_sample = NROWS
    
#pubs_df = pubs_df.sample(n=n_sample, random_state=42).reset_index(drop=True)
 
pubs_df = pubs_df.head(NROWS) # use head for now because we actually do find some of these PhDs
    
pubs_df

In [None]:
# replace institution abbreviation with names that can be found in OpenAlex
pubs_unabbrev_df = unabbreviate_institutions(pubs_df, 'institution')
pubs_unabbrev_df

## 3. Extraction Object

In [None]:
import logging
from pyalex import Authors, Works
import pandas as pd

class AuthorRelations:
    def __init__(self, phd_name, title, year, institution, contributors, years_tolerance=0, verbosity='INFO'):
        self.phd_name = phd_name
        self.title = title
        self.year = year
        self.institution = institution
        self.contributors = contributors
        self.years_tolerance = years_tolerance  # Changed from 'tolerance' to 'years_tolerance'
        self.phd_candidate = None
        self.potential_supervisors = []
        self.verbosity = verbosity.upper()
        
        # Define target years as a property of the object
        self.target_years = self.calculate_target_years()
        
        # Setup logging
        self.logger = logging.getLogger(__name__)
        self.setup_logging()

    def calculate_target_years(self):
        """
        Calculates the target years based on the years_tolerance.
        If years_tolerance is negative, includes years before self.year.
        If years_tolerance is positive, includes years after self.year.
        """
        if self.years_tolerance == 0:
            return [self.year]
        elif self.years_tolerance > 0:
            return list(range(self.year, self.year + self.years_tolerance + 1))
        else:  # years_tolerance < 0
            return list(range(self.year + self.years_tolerance, self.year + 1))
        
    def setup_logging(self):
        # Map verbosity levels to logging levels
        verbosity_levels = {
            'NONE': logging.WARNING,
            'INFO': logging.INFO,
            'DEBUG': logging.DEBUG
        }
        log_level = verbosity_levels.get(self.verbosity, logging.INFO)
        self.logger.setLevel(log_level)

        # Remove all handlers associated with the logger
        for handler in self.logger.handlers[:]:
            self.logger.removeHandler(handler)
        # Set propagate to False to prevent messages from being printed to the console
        self.logger.propagate = False

        # Create a file handler with UTF-8 encoding
        file_handler = logging.FileHandler('author_relations.log', encoding='utf-8')
        file_handler.setLevel(log_level)

        # Create a logging format
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)

        # Add the handler to the logger
        self.logger.addHandler(file_handler)


    def search_phd_candidate(self, criteria):
        """
        Search for the PhD candidate by name and validate based on criteria.
        Criteria options: 'affiliation', 'title', 'both'
        """
        self.logger.info(f"Searching for PhD candidate: {self.phd_name}")
        # Search for candidates by PhD name
        candidates = Authors().search(self.phd_name).get()
        self.logger.debug(f"Found: {len(candidates)} people who are potential matches.")

        # If no candidates are found, log and return
        if not candidates:
            self.logger.warning("No candidates found with the given PhD name.")
            return None

        # Filter candidates based on the specified criteria
        for candidate in candidates:
            self.logger.debug(f"Evaluating candidate: {candidate['display_name']} (ID: {candidate['id']})")
            affiliation_match = self.check_affiliation(candidate)
            title_match = self.check_authored_work(candidate)

            if criteria == 'both':
                if affiliation_match and title_match:
                    self.phd_candidate = candidate
                    self.logger.info(f"PhD candidate confirmed: {candidate['display_name']}")
                    break
                else:
                    self.logger.debug("No match found for both criteria. Moving to next candidate.")
            elif criteria in ['affiliation', 'either']:
                if affiliation_match:
                    self.phd_candidate = candidate
                    self.logger.info(f"PhD candidate confirmed by affiliation: {candidate['display_name']}")
                    break
                else:
                    self.logger.debug("No affiliation match found. Moving to next candidate.")
            elif criteria == ['title', 'either']:
                if title_match:
                    self.phd_candidate = candidate
                    self.logger.info(f"PhD candidate confirmed by title: {candidate['display_name']}")
                    break
                else:
                    self.logger.debug("No title match found. Moving to next candidate.")

        if not self.phd_candidate:
            self.logger.warning("PhD candidate not found or criteria not met.")
            return None
        else:
            return self.phd_candidate

    def check_affiliation(self, candidate):
        """
        Check if the candidate has the correct affiliation in the target years.
        """
        affiliations = candidate.get('affiliations', [])
        match_found = False

        if self.verbosity == 'DEBUG':
            self.logger.debug(f"Target Institution: '{self.institution}', Target Years: {self.target_years}")

        for affiliation in affiliations:
            institution_name = affiliation['institution']['display_name']
            years = affiliation.get('years', [])
            is_match = (self.institution == institution_name) and any(year in self.target_years for year in years)
            self.logger.debug(
                f"Checking affiliation: Candidate Institution '{institution_name}', Years: {years} - "
                f"Match Found: {'Yes' if is_match else 'No'}"
            )
            if is_match:
                match_found = True
                break  # Stop checking after a match is found

        if not match_found:
            self.logger.debug("No affiliation match found for this candidate.")

        return match_found

    def check_authored_work(self, candidate):
        """
        Check if the candidate has authored the specified title.
        """
        candidate_id = candidate['id']
        if self.verbosity == 'DEBUG':
            self.logger.debug(f"Target Title: '{self.title}'")

        works = Works().filter(author={"id": candidate_id}).get()
        match_found = False

        for work in works:
            work_title = work['title']
            if not work_title:
                continue
            is_match = self.title.lower() == work_title.lower()
            self.logger.debug(
                f"Checking work: Candidate Work Title '{work_title}' - Match Found: {'Yes' if is_match else 'No'}"
            )
            if is_match:
                match_found = True
                break  # Stop checking after a match is found

        if not match_found:
            self.logger.debug("No title match found for this candidate.")

        return match_found

    def get_candidate_affiliations(self, candidate):
        """
        Returns a set of institution names that the candidate was affiliated with in the target years.
        """
        affiliations = candidate.get('affiliations', [])
        institutions = set()
        for affiliation in affiliations:
            institution_name = affiliation['institution']['display_name']
            affiliation_years = affiliation.get('years', [])
            if any(year in self.target_years for year in affiliation_years):
                institutions.add(institution_name)
        return institutions

    def find_potential_supervisors(self):
        """
        Find potential supervisors among the contributors based on shared affiliations with the PhD candidate
        in the target years.
        """
        if not self.phd_candidate:
            self.logger.warning("PhD candidate not confirmed. Cannot find potential supervisors.")
            return []

        # Get PhD candidate's affiliations in target years
        phd_affiliations = self.get_candidate_affiliations(self.phd_candidate)
        if not phd_affiliations:
            self.logger.warning("PhD candidate has no affiliations in target years. Cannot find potential supervisors.")
            return []

        # Log the target institutions (affiliations of the PhD candidate in the target years)
        self.logger.debug(f"Target Institutions: {phd_affiliations}, Target Years: {self.target_years}")
        self.logger.info("Searching for potential supervisors among contributors.")

        for contributor_name in self.contributors:
            self.logger.debug(f"Searching for contributor: {contributor_name}")
            # Search for contributors in OpenAlex
            candidates = Authors().search(contributor_name).get()
            self.logger.debug(f"Found: {len(candidates)} candidates for contributor '{contributor_name}'.")

            # If no candidates are found, log and continue to next contributor
            if not candidates:
                self.logger.debug(f"No candidates found for contributor: {contributor_name}")
                continue

            supervisor_found = False
            for candidate in candidates:
                # Get supervisor's affiliations in target years
                supervisor_affiliations = self.get_candidate_affiliations(candidate)

                # Check for shared affiliations
                shared_affiliations = phd_affiliations.intersection(supervisor_affiliations)

                # Logging per institution we are checking
                for institution in supervisor_affiliations:
                    is_match = institution in phd_affiliations
                    self.logger.debug(
                        f"Checking affiliation: Potential Supervisor '{candidate['display_name']}' Institution '{institution}' - "
                        f"Match Found: {'Yes' if is_match else 'No'}"
                    )

                if shared_affiliations:
                    self.potential_supervisors.append(candidate)
                    self.logger.info(f"Potential supervisor found: {candidate['display_name']} with shared institutions {shared_affiliations}")
                    supervisor_found = True
                    break  # Assuming the first match suffices

            if not supervisor_found:
                self.logger.debug(f"No shared affiliations found for contributor: {contributor_name}")

        # Log the total number of contributors with matches
        self.logger.info(
            f"Total contributors with matches: {len(self.potential_supervisors)} out of {len(self.contributors)}"
        )

        if not self.potential_supervisors:
            self.logger.warning("No potential supervisors found.")
        return self.potential_supervisors

    def get_results(self):
        """
        Return the OpenAlex ID pairs where matches are found.
        """
        if not self.phd_candidate:
            self.logger.warning("No results to return; PhD candidate was not found.")
            return None
        phd_id = self.phd_candidate['id']
        supervisor_ids = [supervisor['id'] for supervisor in self.potential_supervisors]
        self.logger.info(f"Returning results: PhD ID {phd_id}, Supervisor IDs {supervisor_ids}")
        return {'phd_id': phd_id, 'supervisor_ids': supervisor_ids}

In [None]:
# Test for one row

if False:
    # Assume we are working with the first row of the DataFrame
    row = pubs_df.iloc[0]

    # Extract necessary fields
    phd_name = row['phd_name']
    title = row['title']
    year = int(row['year'])
    institution = row['institution']
    contributors = [row[f'contributor_{i}'] for i in range(1, 11) if pd.notna(row[f'contributor_{i}'])]

    # Create an instance of AuthorRelations with desired verbosity ('NONE', 'MEDIUM', 'DETAILED')
    years_tolerance = -1  # years tolerance
    author_relations = AuthorRelations(
        phd_name=phd_name,
        title=title,
        year=year,
        institution=institution,
        contributors=contributors,
        years_tolerance=years_tolerance,
        verbosity='DEBUG'
    )

    # Search for the PhD candidate using both criteria
    author_relations.search_phd_candidate(criteria='title')

    # Find potential supervisors among the contributors
    author_relations.find_potential_supervisors()

    # Get the OpenAlex ID pairs
    results = author_relations.get_results()
    print(results)

In [None]:
# Define a function to process each row
def process_row(row):
    # Extract necessary fields
    phd_name = row['phd_name']
    title = row['title']
    year = int(row['year'])
    institution = row['institution']
    contributors = [row[f'contributor_{i}'] for i in range(1, 11) if pd.notna(row.get(f'contributor_{i}', None))]
    
    # Create an instance of AuthorRelations
    author_relations = AuthorRelations(
        phd_name=phd_name,
        title=title,
        year=year,
        institution=institution,
        contributors=contributors,
        years_tolerance=-1,  # Adjust as needed
        verbosity='DEBUG'  # Set to 'NONE' for production
    )
    
    # Search for the PhD candidate using the desired criteria
    author_relations.search_phd_candidate(criteria='either')
    
    # Find potential supervisors among the contributors
    author_relations.find_potential_supervisors()
    
    # Get the OpenAlex ID pairs
    results = author_relations.get_results()
    
    # Return the results along with the original row index or any additional data you need
    return {
        'phd_name': phd_name,
        'phd_id': results.get('phd_id') if results else None,
        'supervisor_ids': results.get('supervisor_ids') if results else None
    }

# Apply the function to each row using DataFrame.apply
results_list = pubs_df.apply(process_row, axis=1).tolist()

# Convert the results into a DataFrame
results_df = pd.DataFrame(results_list)

results_df.to_csv('data/output/matched_pairs.csv', index=False)