# google scholar search

### Imports

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%%shell
# Ubuntu no longer distributes chromium-browser outside of snap
#
# Proposed solution: https://askubuntu.com/questions/1204571/how-to-install-chromium-without-snap

# Add debian buster
cat > /etc/apt/sources.list.d/debian.list <<'EOF'
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-buster.gpg] http://deb.debian.org/debian buster main
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-buster-updates.gpg] http://deb.debian.org/debian buster-updates main
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-security-buster.gpg] http://deb.debian.org/debian-security buster/updates main
EOF

# Add keys
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys DCC9EFBF77E11517
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 648ACFD622F3D138
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 112695A0E562B32A

apt-key export 77E11517 | gpg --dearmour -o /usr/share/keyrings/debian-buster.gpg
apt-key export 22F3D138 | gpg --dearmour -o /usr/share/keyrings/debian-buster-updates.gpg
apt-key export E562B32A | gpg --dearmour -o /usr/share/keyrings/debian-security-buster.gpg

# Prefer debian repo for chromium* packages only
# Note the double-blank lines between entries
cat > /etc/apt/preferences.d/chromium.pref << 'EOF'
Package: *
Pin: release a=eoan
Pin-Priority: 500


Package: *
Pin: origin "deb.debian.org"
Pin-Priority: 300


Package: chromium*
Pin: origin "deb.debian.org"
Pin-Priority: 700
EOF

# Install chromium and chromium-driver
apt-get update
apt-get install chromium chromium-driver

# Install selenium
pip install selenium
pip install jsonlines
pip install twitter
pip install tweepy
pip install fuzzysearch

Executing: /tmp/apt-key-gpghome.mhECaHMQ2M/gpg.1.sh --keyserver keyserver.ubuntu.com --recv-keys DCC9EFBF77E11517
gpg: key DCC9EFBF77E11517: public key "Debian Stable Release Key (10/buster) <debian-release@lists.debian.org>" imported
gpg: Total number processed: 1
gpg:               imported: 1
Executing: /tmp/apt-key-gpghome.I6IQ5xaSSp/gpg.1.sh --keyserver keyserver.ubuntu.com --recv-keys 648ACFD622F3D138
gpg: key DC30D7C23CBBABEE: public key "Debian Archive Automatic Signing Key (10/buster) <ftpmaster@debian.org>" imported
gpg: Total number processed: 1
gpg:               imported: 1
Executing: /tmp/apt-key-gpghome.wYvYVg65fo/gpg.1.sh --keyserver keyserver.ubuntu.com --recv-keys 112695A0E562B32A
gpg: key 4DFAB270CAA96DFA: public key "Debian Security Archive Automatic Signing Key (10/buster) <ftpmaster@debian.org>" imported
gpg: Total number processed: 1
gpg:               imported: 1
Get:1 http://deb.debian.org/debian buster InRelease [122 kB]
Get:2 http://deb.debian.org/debian bust



In [None]:
import random
import pandas as pd
import numpy as np
import pickle
import jsonlines
import csv
import tweepy
import re
from twitter import *
import json
import time
from tqdm import tqdm

###Utils

In [None]:
from difflib import SequenceMatcher

driver_path = '/usr/bin/chromedriver'

def generate_or_keyword_list(query_dict: dict):
    """Generate necessary keyword lists to help selecting final candidates."""
    or_keyword_list = []
    or_keyword_dict = {}
    or_keyword_dict['gs_sid'] = ''
    domain_labels = []
    if 'expertise' in query_dict['profile']['content']:
        for keyword in query_dict['profile']['content']['expertise']:
            for key in keyword['keywords']:
                key = key.strip().lower()
                domain_labels.append(key)
    or_keyword_dict['domain_labels'] = domain_labels

    coauthors = []
    if 'relations' in query_dict['profile']['content'] and len(query_dict['profile']['content']['relations']) > 0:
        for relation in query_dict['profile']['content']['relations']:
            coauthors.append(relation['name'])
    or_keyword_dict['coauthors'] = coauthors

    if 'history' in query_dict['profile']['content'] and len(query_dict['profile']['content']['history']) > 0:
        tmp_dict = query_dict['profile']['content']['history'][0]
        if 'position' in tmp_dict:
            or_keyword_dict['position'] = tmp_dict['position']
        if 'institution' in tmp_dict:
            if 'domain' in tmp_dict['institution']:
                or_keyword_dict['email_suffix'] = tmp_dict['institution']['domain']
            if 'name' in tmp_dict['institution']:
                or_keyword_dict['organization'] = tmp_dict['institution']['name']

    or_keyword_list.append(or_keyword_dict)

    return or_keyword_list

def get_str_similarity(a: str, b: str) -> float:
    """Calculate the similarity of two strings and return a similarity ratio."""
    return SequenceMatcher(None, a, b).ratio()

### Scholar78kSearch

In [None]:
import pandas as pd
import os
import re
import numpy as np
from typing import Union, List


class Scholar78kSearch():
    def __init__(self):
        self.get_78kdata()
        self.simple = False
        self.verbose = False
        self.print_true = False

    def get_78kdata(self, source='gdrive'):
        """Download and load the 78k dataset data.
        
        Parameters
        ----------
        source : default is 'gdrive'.
        """
        path_name = 'gs_scholars_new.npy'
        if source == 'gdrive':
            self.df = pd.DataFrame.from_records(np.load("/content/drive/My Drive/tweets-dataset/gs_scholars_new.npy", allow_pickle=True))
        else:
            raise NotImplementedError
    
    def search_name(self, name: Union[str, list], query_dict: dict = None) -> List[dict]:
        """Search scholar candidates given name in the 78k AI scholar dataset.
        
        Parameters
        ----------
        name : name of the scholar.
        query_dict : if this is given, the method will run <self._search_name_others_helper()>

        Returns
        -------
        df_row_list : a list of response dictionaries.
        
        """
        if type(name) is list:
            name_list = [name[0], name[-1]]
            name = f'{name[0]} {name[-1]}' 
        elif type(name) is str:
            name_list = re.sub('[0-9_\.\(\)\[\],]', '', name).split(' ')
        else:
            raise TypeError(f'Argument "name" passed to Scholar78kSearch.search_name has the wrong type.')
        df_row = self._search_name_only_helper(name, name_list)
        if df_row.shape[0] > 0 and query_dict is not None:
            df_row = self._search_name_others_helper(df_row, query_dict)
        if self.print_true:
            print(f'[Info] Found {df_row.shape[0]} scholars are in 78k data.')
            print(f'[Debug] Names: {df_row["name"]}')
        if self.verbose:
            print(df_row)
        return self._deal_with_simple(df_row)
        # return df_row

    def _deal_with_simple(self, df_row):
        if self.simple:
            df_row = df_row.loc[:, df_row.columns != 'papers']
        df_row = df_row.drop(['co_authors_all'], axis=1)
        return df_row.to_dict(orient='records')

    def _search_name_only_helper(self, name, name_list):
        """Helper function of search_name

        Returns
        -------
        Boolean : found or not.
        DataFrame : if find else None.
        """
        # find the scholar in our dataset
        name_df = self.df.loc[self.df['name'] == name].copy()
        name_list_df = self.df.loc[self.df['name'].str.contains(pat = f'^{name_list[0].capitalize()} .*{name_list[-1].capitalize()}', regex=True, case=False)].copy()
        return pd.concat([name_df, name_list_df]).drop_duplicates(subset=['url']).reset_index(drop=True)

    def _search_name_others_helper(self, df_row, query_dict):
        # TODO: add a better filter more than by name
        return df_row

### ScholarGsSearch

In [None]:
import re
import time
from typing import Union
from selenium.webdriver.chromium.webdriver import ChromiumDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.errorhandler import NoSuchElementException

from selenium import webdriver
from selenium.webdriver.chrome.options import Options

# url = "https://github.com/googlecolab/colabtools/issues/3347" 

class GoogleSearch():
    """Base class for performing web search on Google using REST API."""
    def __init__(self, driver_path):
        self.setup_webdriver(driver_path)
    
    def setup_webdriver(self, driver_path):
        """Setup the webdriver object."""

        options = Options()
        options.add_argument("--headless")
        options.add_argument("--no-sandbox")
        options.headless = True
        options.add_experimental_option("excludeSwitches", ["enable-automation"])
        options.add_experimental_option('useAutomationExtension', False)
        options.add_argument('--disable-dev-shm-usage')
        self.driver = webdriver.Chrome(driver_path, options=options)


class ScholarGsSearch(GoogleSearch):
    """Class that handling searching on Google Scholar webpage using REST GET API."""
    def __init__(self, driver_path):
        super().__init__(driver_path)
        self._authsearch = 'https://scholar.google.com/citations?hl=en&view_op=search_authors&mauthors={0}'
        self._gsidsearch = 'https://scholar.google.com/citations?hl=en&user={0}'
        self.print_true = False
    
    def change_name(self, name):
        new_name = name[1:].split('_')
        new_name[-1] = re.sub(r'[0-9]+', '', new_name[-1])
        new_name = ' '.join(new_name)
        return new_name

    def search_gsid(self, gs_sid: str, simple: bool = True):
        """Search scholar on Google Scholar based given gs_sid.
        
        Parameters
        ----------
        gs_sid : google scholar sid
        simple : whether return simple information without paper list.

        Returns
        -------
        scholar_dict_list : a list of dicts of responses.
        
        """
        url = self._gsidsearch.format(gs_sid)
        self.search_gs_url(url, simple=simple)
        
    def search_gs_url(self, url: str, simple: bool = True):
        self.driver.get(url)
        scholar_dict = self._search_gsid_helper(self.driver, url, simple=simple)
        time.sleep(5)
        if scholar_dict is not None:
            
            return [scholar_dict]
        else:
            if self.print_true:
                print('[Info] No scholars found given gs_sid in search_gs.')
            return []

        
    def _search_gsid_helper(self, driver: ChromiumDriver, url: str, simple: bool = True):
        """Helper function for search_gsid."""

        def get_single_author(element):
            li=[]
            li.append(element.find_elements(By.TAG_NAME, "a")[0].get_attribute('href'))
            li.append(element.find_elements(By.TAG_NAME, "a")[0].get_attribute('textContent'))
            for i in element.find_elements(By.CLASS_NAME, "gsc_rsb_a_ext"):
                li.append(i.get_attribute('textContent'))
            return li

        html_first_class = driver.find_elements(By.CLASS_NAME, "gsc_g_hist_wrp")
        if (len(html_first_class)==0):
            if self.print_true:
                print("[Info] len(html_first_class)==0")
            return None
        idx_list = html_first_class[0].find_elements(By.CLASS_NAME, "gsc_md_hist_b")[0]
        years =  [i.get_attribute('textContent') for i in idx_list.find_elements(By.CLASS_NAME, "gsc_g_t")]
        cites =  [i.get_attribute('innerHTML') for i in idx_list.find_elements(By.CLASS_NAME, "gsc_g_al")]
        rsb = driver.find_elements(By.CLASS_NAME, "gsc_rsb")[0]
        Citations_table=[i.get_attribute('textContent') for i in  rsb.find_elements(By.CLASS_NAME, "gsc_rsb_std")]
        Co_authors = rsb.find_elements(By.CLASS_NAME, "gsc_rsb_a")
        if len(Co_authors) == 0:
            Co_authors = None
        else:
            Co_authors = [get_single_author(i) for i in rsb.find_element(By.CLASS_NAME, "gsc_rsb_a").find_elements(By.CLASS_NAME, "gsc_rsb_a_desc")]

        Researcher = {"url": url}
        gs_sid = None
        if 'user=' in url:
            tmp_gs_sid = url.split('user=', 1)[1]
            if len(tmp_gs_sid) >= 12:
                gs_sid = tmp_gs_sid[:12]
        # gs_sid
        Researcher['gs_sid'] = gs_sid
        # coauthors that are listed at the lower right of the profile page
        Researcher["coauthors"] = Co_authors
        # citation table
        Researcher["citation_table"] = [Citations_table[0], Citations_table[2]]
        # time series citations
        Researcher["cites"] = {"years":years, "cites":cites}
        # name
        nameList = driver.find_elements(By.ID, "gsc_prf_in")
        if (len(nameList) != 1):
            if self.print_true:
                print("len(nameList)!=1")
            return None
        Researcher["name"] = nameList[0].text
        # organization
        infoList = driver.find_elements(By.CLASS_NAME, 'gsc_prf_il')
        Researcher['organization'] = infoList[0].get_attribute('textContent')
        # homepage
        homepage_url = infoList[1].find_elements(By.TAG_NAME, 'a')
        if len(homepage_url) == 0:
            Researcher['homepage_url'] = None
        else:
            Researcher['homepage_url'] = homepage_url[0].get_attribute('href')
        # email address
        email_str_match = re.search(r'[\w-]+\.[\w.-]+', infoList[1].text)
        if email_str_match is not None:
            Researcher['email_info'] = email_str_match.group(0)
        # domain labels
        Researcher['domain_labels'] = [i.get_attribute('textContent').strip().lower() for i in infoList[2].find_elements(By.CLASS_NAME, 'gsc_prf_inta')]
        # if not simple, get paper lists
        if not simple:
            button = driver.find_elements(By.CLASS_NAME, 'gs_btnPD')
            if (len(button) != 1):
                if self.print_true:
                    print("len(button)!=1")
                return None
            while (button[0].is_enabled()):
                while (button[0].is_enabled()):
                    while (button[0].is_enabled()):
                        button[0].click()
                        time.sleep(5)
                    time.sleep(1)
                time.sleep(2)
            papers = []
            items = driver.find_elements(By.CLASS_NAME, 'gsc_a_tr')
            for i in items:
                item = i.find_element(By.CLASS_NAME, 'gsc_a_at')
                url = item.get_attribute("href")
                paper_info=[j.text for j in i.find_elements(By.CLASS_NAME, 'gs_gray')]
                cite = i.find_element(By.CLASS_NAME, 'gsc_a_ac')
                year = i.find_element(By.CLASS_NAME, 'gsc_a_y').find_element(By.CLASS_NAME, "gsc_a_h").text
                papers.append([url, item.text, 
                                paper_info,
                            cite.text, cite.get_attribute("href"),
                            year])
            Researcher["papers"] = papers

        def generate_single_coauthor(element):
            coauthor_dict = {
                "name":element.find_elements(By.CLASS_NAME, 'gs_ai_name')[0].get_attribute('textContent'),
                "url":element.find_elements(By.CLASS_NAME, 'gs_ai_pho')[0].get_attribute('href'),
                "description":element.get_attribute('innerHTML'),
            }
            return coauthor_dict
        extra_coauthors = driver.find_elements(By.CLASS_NAME, "gsc_ucoar")
        Researcher['extra_co_authors'] = [generate_single_coauthor(i) for i in extra_coauthors]
        return Researcher

    def search_name(self, name: Union[str, list], query_dict: dict = None, top_n=3, simple=True):
        """Search on Google Scholar webpage given name.
        
        Parameters
        ----------
        name : name of the scholar.
        query_dict : a dict containing information of the scholar.
        top_n : select <top_n> candidates.
        simple : whether return simple information without paper list.

        Returns
        -------
        resp : list of candidate scholars, empty if no candidates are found.

        """
        if type(name) is list:
            # current case
            name_list = [name[0], name[-1]]
            name = f'{name[0]} {name[-1]}' 
        elif type(name) is str:
            name_list = name.split(' ')
        else:
            raise TypeError('Argument "name" passed to ScholarGsSearch.search_name has the wrong type.')
        url_fragment = f'{name} '
        if query_dict is not None:
            # first try (name, email_suffix, position, organization) as url
            keyword_list = generate_or_keyword_list(query_dict)[0]
            url_fragment_new = url_fragment
            # if 'email_suffix' in keyword_list:
            #     url_fragment_new = url_fragment_new + keyword_list['email_suffix'] + ' '
            # if 'position' in keyword_list:
            #     url_fragment_new = url_fragment_new + keyword_list['position'] + ' '
            # if 'organization' in keyword_list:
            #     url_fragment_new = url_fragment_new + keyword_list['organization'] + ' '

            # url = self._authsearch.format(url_fragment_new)
            # self.driver.get(url)
            # time.sleep(5)
            # scholar_list = self._search_name_helper(self.driver, name_list)
            # if len(scholar_list) > 0:
            #     if wo_full:
            #         return scholar_list
            #     else:
            #         return self._search_name_list_expand(scholar_list, simple=simple)
            
            # second try (name, email_suffix)
            if 'email_suffix' in keyword_list:
                url_fragment_new = url_fragment + keyword_list['email_suffix'] # + ' '
            url = self._authsearch.format(url_fragment_new)
            self.driver.get(url)
            time.sleep(5)
            scholar_list = self._search_name_helper(self.driver, name_list)
            # return scholar_list
            if len(scholar_list) > 0:
                if self.print_true:
                    print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 1.')
                # return self._search_name_list_expand(scholar_list, simple=simple)
                return scholar_list
        
            # third try (name, position)
            if 'position' in keyword_list:
                url_fragment_new = url_fragment + keyword_list['position'] # + ' '
            url = self._authsearch.format(url_fragment_new)
            self.driver.get(url)
            time.sleep(5)
            scholar_list = self._search_name_helper(self.driver, name_list)
            # return scholar_list
            if len(scholar_list) > 0:
                if self.print_true:
                    print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 2.')
                # return self._search_name_list_expand(scholar_list, simple=simple)
                return scholar_list

            # fourth try (name, organization)
            if 'organization' in keyword_list:
                url_fragment_new = url_fragment + keyword_list['organization'] # + ' '
            url = self._authsearch.format(url_fragment_new)
            self.driver.get(url)
            time.sleep(5)
            scholar_list = self._search_name_helper(self.driver, name_list)
            # return scholar_list
            if len(scholar_list) > 0:
                if self.print_true:
                    print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 3.')
                # return self._search_name_list_expand(scholar_list, simple=simple)
                return scholar_list

        # finally, only search (name: firstname and lastname). If only one response returns, mark it as candidate
        url = self._authsearch.format(url_fragment)
        self.driver.get(url)
        time.sleep(5)
        scholar_list = self._search_name_helper(self.driver, name_list)
        if len(scholar_list) > 0:
        # if len(scholar_list) > 0 and len(scholar_list) <= top_n:
            if self.print_true:
                print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 4.')
            # return self._search_name_list_expand(scholar_list, simple=simple)
            return scholar_list
        
        return []

    def _search_name_helper(self, driver, name_list):
        """Helper function of <self.search_name()>."""
        # iterate over searched list, find dicts that contains the name (including)
        useful_info_list = driver.find_elements(By.CLASS_NAME, 'gs_ai_t')
        useful_info_ext_list = []
        if len(useful_info_list) != 0:
            for scholar_webdriver in useful_info_list:
                name = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_name').get_attribute('textContent').strip()
                # check whether name is correct
                not_a_candidate = False
                for name_fragment in name_list:
                    if name_fragment.lower() not in name.lower():
                        not_a_candidate = True
                        break
                if not_a_candidate:
                    continue
                
                # grab all the other information
                pos_org = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_aff').get_attribute('textContent').strip()
                email_str = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_eml').get_attribute('textContent').strip()
                cite = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_cby').get_attribute('textContent').strip()
                url = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_name').find_element(By.TAG_NAME, 'a').get_attribute('href').strip()
                domain_labels = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_int').find_elements(By.CLASS_NAME, 'gs_ai_ont_int')
                for idx, domain in enumerate(domain_labels):
                    domain_labels[idx] = domain.get_attribute('textContent').strip().lower()

                # continue processing
                gs_sid = None
                if 'user=' in url:
                    tmp_gs_sid = url.split('user=', 1)[1]
                    if len(tmp_gs_sid) >= 12:
                        gs_sid = tmp_gs_sid[:12]

                if email_str is not None and email_str != '':
                    match = re.search(r'[\w-]+\.[\w.-]+', email_str)
                    email_str = match.group(0)

                cites = [int(s) for s in cite.split() if s.isdigit()]
                useful_info_ext_list.append({
                    'name': name,
                    'pos_org': pos_org,
                    'email': email_str,
                    'cite': cites[0] if len(cites)>0 else None,
                    'url': url,
                    'gs_sid': gs_sid,
                    'domain_labels': domain_labels
                })
        return useful_info_ext_list
        
    def _search_name_list_expand(self, scholar_list, simple=True):
        """Expand the name_list to full_name_list."""
        new_scholar_list = []
        for scholar in scholar_list:
            if 'gs_sid' in scholar:
                url = self._gsidsearch.format(scholar['gs_sid'])
                self.driver.get(url)
                scholar_dict = self._search_gsid_helper(self.driver, url, simple=simple)
                if scholar_dict is not None:
                    new_scholar_list.append(scholar_dict)
                time.sleep(5)
        return new_scholar_list

### GoogleSearch

In [None]:
# import re
# import time
# from typing import Union
# # from selenium import webdriver
# # from selenium.webdriver.chrome.options import ChromiumOptions
# from selenium.webdriver.chromium.webdriver import ChromiumDriver
# from selenium.webdriver.common.by import By
# from selenium.webdriver.remote.errorhandler import NoSuchElementException
# # from webdriver_manager.chrome import ChromeDriverManager

# from selenium import webdriver
# from selenium.webdriver.chrome.options import Options

# # url = "https://github.com/googlecolab/colabtools/issues/3347" 

# class GoogleSearch():
#     """Base class for performing web search on Google using REST API."""
#     def __init__(self, driver_path):
#         self.setup_webdriver(driver_path)
    
#     def setup_webdriver(self, driver_path):
#         """Setup the webdriver object."""

#         options = Options()
#         options.add_argument("--headless")
#         options.add_argument("--no-sandbox")
#         options.headless = True
#         # options = ChromiumOptions()
#         options.add_experimental_option("excludeSwitches", ["enable-automation"])
#         options.add_experimental_option('useAutomationExtension', False)
#         # options.add_argument('--headless')
#         # options.add_argument('--no-sandbox')
#         options.add_argument('--disable-dev-shm-usage')

#         self.driver = webdriver.Chrome(driver_path, options=options)

#         # self.driver = webdriver.Chrome(driver_path, options=options)
#         # # self.driver = webdriver.Chrome(ChromeDriverManager().install())

#         # self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
#         #     "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
#         # })


# class ScholarGsSearch(GoogleSearch):
#     """Class that handling searching on Google Scholar webpage using REST GET API."""
#     def __init__(self, driver_path):
#         super().__init__(driver_path)
#         self._authsearch = 'https://scholar.google.com/citations?hl=en&view_op=search_authors&mauthors={0}'
#         self._gsidsearch = 'https://scholar.google.com/citations?hl=en&user={0}'
#         self.print_true = True
#         # print("ScholarGsSearch initiated")
    
#     def change_name(self, name):
#         new_name = name[1:].split('_')
#         new_name[-1] = re.sub(r'[0-9]+', '', new_name[-1])
#         new_name = ' '.join(new_name)
#         return new_name

#     def search_gsid(self, gs_sid: str, simple: bool = True):
#         """Search scholar on Google Scholar based given gs_sid.
        
#         Parameters
#         ----------
#         gs_sid : google scholar sid
#         simple : whether return simple information without paper list.

#         Returns
#         -------
#         scholar_dict_list : a list of dicts of responses.
        
#         """
#         url = self._gsidsearch.format(gs_sid)
#         self.search_gs_url(url, simple=simple)
        
#     def search_gs_url(self, url: str, simple: bool = True):
#         self.driver.get(url)
#         scholar_dict = self._search_gsid_helper(self.driver, url, simple=simple)
#         time.sleep(5)
#         if scholar_dict is not None:
            
#             return [scholar_dict]
#         else:
#             if self.print_true:
#                 print('[Info] No scholars found given gs_sid in search_gs.')
#             return []

        
#     def _search_gsid_helper(self, driver: ChromiumDriver, url: str, simple: bool = True):
#         """Helper function for search_gsid."""

#         def get_single_author(element):
#             li=[]
#             li.append(element.find_elements(By.TAG_NAME, "a")[0].get_attribute('href'))
#             li.append(element.find_elements(By.TAG_NAME, "a")[0].get_attribute('textContent'))
#             for i in element.find_elements(By.CLASS_NAME, "gsc_rsb_a_ext"):
#                 li.append(i.get_attribute('textContent'))
#             return li

#         html_first_class = driver.find_elements(By.CLASS_NAME, "gsc_g_hist_wrp")
#         if (len(html_first_class)==0):
#             if self.print_true:
#                 print("[Info] len(html_first_class)==0")
#             return None
#         idx_list = html_first_class[0].find_elements(By.CLASS_NAME, "gsc_md_hist_b")[0]
#         years =  [i.get_attribute('textContent') for i in idx_list.find_elements(By.CLASS_NAME, "gsc_g_t")]
#         cites =  [i.get_attribute('innerHTML') for i in idx_list.find_elements(By.CLASS_NAME, "gsc_g_al")]
#         rsb = driver.find_elements(By.CLASS_NAME, "gsc_rsb")[0]
#         Citations_table=[i.get_attribute('textContent') for i in  rsb.find_elements(By.CLASS_NAME, "gsc_rsb_std")]
#         Co_authors = rsb.find_elements(By.CLASS_NAME, "gsc_rsb_a")
#         if len(Co_authors) == 0:
#             Co_authors = None
#         else:
#             Co_authors = [get_single_author(i) for i in rsb.find_element(By.CLASS_NAME, "gsc_rsb_a").find_elements(By.CLASS_NAME, "gsc_rsb_a_desc")]

#         Researcher = {"url": url}
#         gs_sid = None
#         if 'user=' in url:
#             tmp_gs_sid = url.split('user=', 1)[1]
#             if len(tmp_gs_sid) >= 12:
#                 gs_sid = tmp_gs_sid[:12]
#         # gs_sid
#         Researcher['gs_sid'] = gs_sid
#         # coauthors that are listed at the lower right of the profile page
#         Researcher["coauthors"] = Co_authors
#         # citation table
#         Researcher["citation_table"] = [Citations_table[0], Citations_table[2]]
#         # time series citations
#         Researcher["cites"] = {"years":years, "cites":cites}
#         # name
#         nameList = driver.find_elements(By.ID, "gsc_prf_in")
#         if (len(nameList) != 1):
#             if self.print_true:
#                 print("len(nameList)!=1")
#             return None
#         Researcher["name"] = nameList[0].text
#         # organization
#         infoList = driver.find_elements(By.CLASS_NAME, 'gsc_prf_il')
#         Researcher['organization'] = infoList[0].get_attribute('textContent')
#         # homepage
#         homepage_url = infoList[1].find_elements(By.TAG_NAME, 'a')
#         if len(homepage_url) == 0:
#             Researcher['homepage_url'] = None
#         else:
#             Researcher['homepage_url'] = homepage_url[0].get_attribute('href')
#         # email address
#         email_str_match = re.search(r'[\w-]+\.[\w.-]+', infoList[1].text)
#         if email_str_match is not None:
#             Researcher['email_info'] = email_str_match.group(0)
#         # domain labels
#         Researcher['domain_labels'] = [i.get_attribute('textContent').strip().lower() for i in infoList[2].find_elements(By.CLASS_NAME, 'gsc_prf_inta')]
#         # if not simple, get paper lists
#         if not simple:
#             button = driver.find_elements(By.CLASS_NAME, 'gs_btnPD')
#             if (len(button) != 1):
#                 if self.print_true:
#                     print("len(button)!=1")
#                 return None
#             while (button[0].is_enabled()):
#                 while (button[0].is_enabled()):
#                     while (button[0].is_enabled()):
#                         button[0].click()
#                         time.sleep(5)
#                     time.sleep(1)
#                 time.sleep(2)
#             papers = []
#             items = driver.find_elements(By.CLASS_NAME, 'gsc_a_tr')
#             for i in items:
#                 item = i.find_element(By.CLASS_NAME, 'gsc_a_at')
#                 url = item.get_attribute("href")
#                 paper_info=[j.text for j in i.find_elements(By.CLASS_NAME, 'gs_gray')]
#                 cite = i.find_element(By.CLASS_NAME, 'gsc_a_ac')
#                 year = i.find_element(By.CLASS_NAME, 'gsc_a_y').find_element(By.CLASS_NAME, "gsc_a_h").text
#                 papers.append([url, item.text, 
#                                 paper_info,
#                             cite.text, cite.get_attribute("href"),
#                             year])
#             Researcher["papers"] = papers

#         def generate_single_coauthor(element):
#             coauthor_dict = {
#                 "name":element.find_elements(By.CLASS_NAME, 'gs_ai_name')[0].get_attribute('textContent'),
#                 "url":element.find_elements(By.CLASS_NAME, 'gs_ai_pho')[0].get_attribute('href'),
#                 "description":element.get_attribute('innerHTML'),
#             }
#             return coauthor_dict
#         extra_coauthors = driver.find_elements(By.CLASS_NAME, "gsc_ucoar")
#         Researcher['extra_co_authors'] = [generate_single_coauthor(i) for i in extra_coauthors]
#         return Researcher

#     def search_name(self, name: Union[str, list], query_dict: dict = None, top_n=3, simple=True):
#         """Search on Google Scholar webpage given name.
        
#         Parameters
#         ----------
#         name : name of the scholar.
#         query_dict : a dict containing information of the scholar.
#         top_n : select <top_n> candidates.
#         simple : whether return simple information without paper list.

#         Returns
#         -------
#         resp : list of candidate scholars, empty if no candidates are found.

#         """
#         if type(name) is list:
#             # current case
#             name_list = [name[0], name[-1]]
#             name = f'{name[0]} {name[-1]}' 
#         elif type(name) is str:
#             name_list = name.split(' ')
#         else:
#             raise TypeError('Argument "name" passed to ScholarGsSearch.search_name has the wrong type.')
#         url_fragment = f'{name} '
#         if query_dict is not None:
#             # first try (name, email_suffix, position, organization) as url
#             keyword_list = generate_or_keyword_list(query_dict)[0]
#             url_fragment_new = url_fragment
#             # if 'email_suffix' in keyword_list:
#             #     url_fragment_new = url_fragment_new + keyword_list['email_suffix'] + ' '
#             # if 'position' in keyword_list:
#             #     url_fragment_new = url_fragment_new + keyword_list['position'] + ' '
#             # if 'organization' in keyword_list:
#             #     url_fragment_new = url_fragment_new + keyword_list['organization'] + ' '

#             # url = self._authsearch.format(url_fragment_new)
#             # self.driver.get(url)
#             # time.sleep(5)
#             # scholar_list = self._search_name_helper(self.driver, name_list)
#             # if len(scholar_list) > 0:
#             #     if wo_full:
#             #         return scholar_list
#             #     else:
#             #         return self._search_name_list_expand(scholar_list, simple=simple)
            
#             # second try (name, email_suffix)
#             if 'email_suffix' in keyword_list:
#                 url_fragment_new = url_fragment + keyword_list['email_suffix'] # + ' '
#             url = self._authsearch.format(url_fragment_new)
#             self.driver.get(url)
#             time.sleep(5)
#             scholar_list = self._search_name_helper(self.driver, name_list)
#             # return scholar_list
#             if len(scholar_list) > 0:
#                 if self.print_true:
#                     print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 1.')
#                 # return self._search_name_list_expand(scholar_list, simple=simple)
#                 return scholar_list
        
#             # third try (name, position)
#             if 'position' in keyword_list:
#                 url_fragment_new = url_fragment + keyword_list['position'] # + ' '
#             url = self._authsearch.format(url_fragment_new)
#             self.driver.get(url)
#             time.sleep(5)
#             scholar_list = self._search_name_helper(self.driver, name_list)
#             # return scholar_list
#             if len(scholar_list) > 0:
#                 if self.print_true:
#                     print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 2.')
#                 # return self._search_name_list_expand(scholar_list, simple=simple)
#                 return scholar_list

#             # fourth try (name, organization)
#             if 'organization' in keyword_list:
#                 url_fragment_new = url_fragment + keyword_list['organization'] # + ' '
#             url = self._authsearch.format(url_fragment_new)
#             self.driver.get(url)
#             time.sleep(5)
#             scholar_list = self._search_name_helper(self.driver, name_list)
#             # return scholar_list
#             if len(scholar_list) > 0:
#                 if self.print_true:
#                     print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 3.')
#                 # return self._search_name_list_expand(scholar_list, simple=simple)
#                 return scholar_list

#         # finally, only search (name: firstname and lastname). If only one response returns, mark it as candidate
#         url = self._authsearch.format(url_fragment)
#         self.driver.get(url)
#         time.sleep(5)
#         scholar_list = self._search_name_helper(self.driver, name_list)
#         if len(scholar_list) > 0:
#         # if len(scholar_list) > 0 and len(scholar_list) <= top_n:
#             if self.print_true:
#                 print(f'[Info] Find {len(scholar_list)} scholars using query without gs_sid in step 4.')
#             # return self._search_name_list_expand(scholar_list, simple=simple)
#             return scholar_list
        
#         return []

#     def _search_name_helper(self, driver, name_list):
#         """Helper function of <self.search_name()>."""
#         # iterate over searched list, find dicts that contains the name (including)
#         useful_info_list = driver.find_elements(By.CLASS_NAME, 'gs_ai_t')
#         useful_info_ext_list = []
#         if len(useful_info_list) != 0:
#             for scholar_webdriver in useful_info_list:
#                 name = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_name').get_attribute('textContent').strip()
#                 # check whether name is correct
#                 not_a_candidate = False
#                 for name_fragment in name_list:
#                     if name_fragment.lower() not in name.lower():
#                         not_a_candidate = True
#                         break
#                 if not_a_candidate:
#                     continue
                
#                 # grab all the other information
#                 pos_org = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_aff').get_attribute('textContent').strip()
#                 email_str = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_eml').get_attribute('textContent').strip()
#                 cite = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_cby').get_attribute('textContent').strip()
#                 url = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_name').find_element(By.TAG_NAME, 'a').get_attribute('href').strip()
#                 domain_labels = scholar_webdriver.find_element(By.CLASS_NAME, 'gs_ai_int').find_elements(By.CLASS_NAME, 'gs_ai_ont_int')
#                 for idx, domain in enumerate(domain_labels):
#                     domain_labels[idx] = domain.get_attribute('textContent').strip().lower()

#                 # continue processing
#                 gs_sid = None
#                 if 'user=' in url:
#                     tmp_gs_sid = url.split('user=', 1)[1]
#                     if len(tmp_gs_sid) >= 12:
#                         gs_sid = tmp_gs_sid[:12]

#                 if email_str is not None and email_str != '':
#                     match = re.search(r'[\w-]+\.[\w.-]+', email_str)
#                     email_str = match.group(0)

#                 cites = [int(s) for s in cite.split() if s.isdigit()]
#                 useful_info_ext_list.append({
#                     'name': name,
#                     'pos_org': pos_org,
#                     'email': email_str,
#                     'cite': cites[0] if len(cites)>0 else None,
#                     'url': url,
#                     'gs_sid': gs_sid,
#                     'domain_labels': domain_labels
#                 })
#         return useful_info_ext_list
        
#     def _search_name_list_expand(self, scholar_list, simple=True):
#         """Expand the name_list to full_name_list."""
#         new_scholar_list = []
#         for scholar in scholar_list:
#             if 'gs_sid' in scholar:
#                 url = self._gsidsearch.format(scholar['gs_sid'])
#                 self.driver.get(url)
#                 scholar_dict = self._search_gsid_helper(self.driver, url, simple=simple)
#                 if scholar_dict is not None:
#                     new_scholar_list.append(scholar_dict)
#                 time.sleep(5)
#         return new_scholar_list

In [None]:
from difflib import SequenceMatcher

def get_str_similarity(a: str, b: str) -> float:
    """Calculate the similarity of two strings and return a similarity ratio."""
    return SequenceMatcher(None, a, b).ratio()

### ScholarSearch

In [None]:
import pandas as pd
import numpy as np
import pickle
import json
import typing
from typing import List, Union
import os
import re
import time
import sys
import requests
from bs4 import BeautifulSoup


class ScholarSearch():
    """A class that handles searching over Google Scholar profiles and the 78k AI scholar dataset."""
    def __init__(self):
        # attributes
        self.similarity_ratio = 0.8
        self.driver_path = '/usr/bin/chromedriver'
    
    def setup(self):
        # self.get_profiles(['review_data/area_chair_id_to_profile.json', 'review_data/reviewer_id_to_profile.json'])
        # self.get_profiles(None)
        self.search_78k = Scholar78kSearch()
        self.search_gs = ScholarGsSearch(self.driver_path)

    def reset(self):
        pass

    def get_profiles(self, filepath_list: List[str] = None) -> None:
        """In case that you want to get responses of a list of scholars, 
        the method is implemented for you to load (could be multiple) json data files.

        Parameters
        ----------
        filepath_list : list of json data filepaths to load.

        """
        if filepath_list is None:
            return
        # set of json data dicts
        self.profile = {}
        for filepath in filepath_list:
            with open(filepath) as file:
                profile = json.load(file)
                self.profile.update(profile)
        # number of unique json data dicts in total
        print(f'Number of unique json data dicts in total: {len(self.profile)}')

    def get_scholar(
        self,
        query: Union[str, dict],
        field: List[str] = None,
        simple: bool = True,
        top_n: int = 3,
        print_true: bool = True) -> List[dict]:
        """Get up to <top_n> relevant candidate scholars by searching over Google Scholar profiles and the 78k AI scholar dataset.
        
        Parameters
        ----------
        query : a query containing the known scholar information.
        field : a list of fields wants to return. If not given, by default full information will be returned.
        simple : whether return simple information without paper list. This works only if the argument <field> is not specified.
        top_n : return at most <top_n> scholars if the result is not considered as determined.
        print_true : print info / debug info of the search process.

        Returns
        -------
        resp : list of candidate scholars, empty if no candidates are found.

        """

        self.search_78k.simple = simple
        self.search_78k.print_true = print_true
        self.search_gs.print_true = print_true
        self.print_true = print_true
        self.reset()

        scholar_cnt = 0
        if type(query) is dict:
            # query is dict
            resp = self.search_dict(query, simple=simple, top_n=top_n)
        elif type(query) is str:
            # query is str
            resp = self.search_name(query, simple=simple, top_n=top_n)                
        else:
            raise TypeError(f'[Error] The argument "query" must be str or dict, not {type(query)}.')

        
        # select specific features
        if field is not None:
            resp_final = []
            for resp_item in resp:
                resp_dict = {}
                for field_item in field:
                    if field_item not in resp_item:
                        raise KeyError(f'The key {field_item} is not in the response dictionary')
                    
                    resp_dict[field_item] = resp_item[field_item]
                resp_dict['gs_sid'] = resp_item['gs_sid']
                resp_dict['url'] = resp_item['url']
                resp_dict['citation_table'] = resp_item['citation_table']
                resp_final.append(resp_dict)
            if print_true:
                scholar_cnt = len(resp_final)
                if scholar_cnt == 1:
                    print(f'[Info] In total 1 scholar is found:')
                else:
                    print(f'[Info] In total {scholar_cnt} scholars are found:')
                resp_str = json.dumps(resp_final, indent=2)
                print(resp_str)
            return resp_final
        else:
            if print_true:
                scholar_cnt = len(resp)
                if scholar_cnt == 1:
                    print(f'[Info] In total 1 scholar is found:')
                else:
                    print(f'[Info] In total {scholar_cnt} scholars are found:')
                resp_str = json.dumps(resp, indent=2)
                print(resp_str)
            return resp
    
    def search_name(self, name: str, simple: bool = True, top_n: int = 3, from_dict: bool = False, query_dict: dict = None) -> List[dict]:
        """Search gs profile given name or OpenReview id.
        
        Parameters
        ----------
        name : the name of the scholar ([first_name last_name]).
        simple : whether return simple information without paper list. This works only if the argument <field> is not specified.
        top_n : return at most <top_n> scholars if the result is not considered as determined.
        from_dict : default = False. Should be true only if using <get_scholar()> class method.
        query_dict : default = None. Should be a dict only if using <get_scholar()> class method.

        Returns
        -------
        resp : list of candidate scholars, empty if no candidates are found.
        """

        self.search_78k.simple = simple
        name = name.strip()
        dict = None
        real_name = True
        # OpenReview id
        if ' ' not in name and name[0] == '~':
            # search over chair id
            if name in self.profile:
                dict = self.profile[name]
            # crawl http api response
            if dict is not None and not from_dict:
                # name
                real_name = False
                resp = self.search_dict(dict, simple=simple, top_n=top_n)
            else:
                # get real name
                or_name = name # string
                name = name[1:].split('_')
                name[-1] = re.sub(r'[0-9]+', '', name[-1]) # list
                # name = ' '.join(name) # e.g., Rachel K. E. Bellamy
        else:
            or_name = name.split(' ') # list
            # name string
        if real_name:
            if from_dict:
                print('Not find by gs_sid, search from_dict')
                # it inputs a real name (firstname, lastname)
                resp = self.search_78k.search_name(name, query_dict)
                resp_gs = self.search_gs.search_name(name, query_dict, top_n=top_n, simple=simple)
                resp = self.select_final_cands(resp, top_n, query_dict=query_dict, resp_gs_prop={'resp_gs': resp_gs})
            else:
                # or_resp = self.get_or_scholars(or_name)
                # TODO: resp_gs for only searching name is not implemented
                # resp = self.select_final_cands(resp, or_resp, top_n, simple=simple)
                resp = self.search_78k.search_name(name)
                resp_gs = self.search_gs.search_name(name, query_dict=None, top_n=top_n, simple=simple)
                resp = self.select_final_cands(resp, top_n, query_dict=None, resp_gs_prop={'resp_gs': resp_gs})
        return resp
    

    def get_or_scholars(self, or_name: Union[str, list]):
        """Get OpenReview candidate scholars list by name through http api response."""
        # format the name list to get OpenReview rest api response
        if type(or_name) is list:
            or_name_list = []
            if len(or_name) >= 2:
                id_list = []
                for idx, name_part in enumerate(or_name):
                    if idx == 0 or idx == len(or_name) - 1:
                        id_list.append(name_part.capitalize())
                    else:
                        if len(name_part) > 1:
                            id_list.append(f'{name_part[0].upper()}.') # middle name in abbreviate form
                        else:
                            id_list.append(name_part.upper())
                if len(id_list) == 2:
                    or_name_list.append(f'~{id_list[0]}_{id_list[-1]}')
                elif len(id_list) > 2:
                    or_name_list.append(f'~{id_list[0]}_{id_list[-1]}')
                    tmp_str = '_'.join(id_list)
                    or_name_list.append(f'~{tmp_str}')
            else:
                raise ValueError('Argument "or_name" passed to get_or_scholars is not a valid name list.')
        elif type(or_name) is str:
            or_name_list = [or_name]
        else:
            raise TypeError(f'Argument "or_name" passed to get_or_scholars has the wrong type.')
        del or_name

        # get request response
        go_ahead = True
        resp_list = []
        for name in or_name_list:
            if name[-1].isnumeric():
                name_cur = name
                go_ahead = False
                name_cur_cnt = 1
            else:
                name_cur_cnt = 1
                name_cur = f'{name}{name_cur_cnt}'

            # set accumulative count
            acc_cnt = 0
            while acc_cnt <= 1:
                response = requests.get(f'https://openreview.net/profile?id={name_cur}')
                time.sleep(1)

                if not response.ok:
                    acc_cnt += 1
                else:
                    soup = BeautifulSoup(response.content.decode('utf-8'), 'html.parser')
                    resp_list.append(json.loads(soup.find_all('script', id="__NEXT_DATA__")[0].string))
                name_cur_cnt += 1
                name_cur = f'{name}{name_cur_cnt}'
                if not go_ahead:
                    break
        if self.print_true:
            if len(resp_list) != 1:
                print(f'[Info] Found {len(resp_list)} scholars using OpenReview REST API.')
            else:
                print(f'[Info] Found 1 scholar using OpenReview REST API.')
        return resp_list 
        # NOTE: the dict in this list is in a different format than the dict from OpenReview dataset.

    def select_final_cands(self, resp: List[dict], top_n: int, query_dict: dict = None, resp_gs_prop: dict = None, simple: bool = True) -> List[dict]:
        """Select final candidates according to the response from OpenReview and 78k data.
        
        Parameters
        ----------
        resp : response from 78k dataset.
        or_resp : prepare the necessary key-value pairs to help filtering.
        top_n : return at most <top_n> scholars if the result is not considered as determined.
        query_dict : default = None. Should be a dict only if using <get_scholar()> class method.
        resp_gs_prop : dict containing the response from Google Scholar webpage.
        simple : whether return simple information without paper list. This works only if the argument <field> is not specified.

        Returns
        -------
        resp : list of candidate scholars, empty if no candidates are found.
        
        """
        # get useful data from or_resp
        if query_dict is not None:
            or_keyword_list = generate_or_keyword_list(query_dict)

        # merge resp with resp_gs
        if resp_gs_prop is not None:
            resp_gs = resp_gs_prop['resp_gs']
            # if there are one candidate from google scholar pages, we throw out resp from 78k data.
            if len(resp_gs) == 1:
                resp = []
            # iterate over resp_gs
            for resp_gs_item in resp_gs:
                find_flag = False
                # gs_sid
                for resp_item in resp:
                    if resp_gs_item['gs_sid'] == resp_item['gs_sid']:
                        find_flag = True
                        break
                if find_flag:
                    continue
                # construct new prep
                # generate full dict
                self.search_gs.driver.get(resp_gs_item['url'])
                time.sleep(5)
                if query_dict is not None or (query_dict is None and len(resp) <= top_n):
                    resp_gs_full_item = self.search_gs._search_gsid_helper(self.search_gs.driver, resp_gs_item['url'], simple=simple)
                    if resp_gs_full_item is not None:
                        resp.append(resp_gs_full_item)
        
        if query_dict is None:
            return resp[:top_n]

        # calculate rankings
        rank = {}
        for idx_cand, cand in enumerate(resp):
            rank[idx_cand] = []
            gs_sid_flag = 0
            cnt_true = [0] * len(or_keyword_list) 
            cnt_all = 0
            cnt_true_rel = [0] * len(or_keyword_list) 
            cnt_all_rel = 0
            for idx_or_scholar, or_scholar in enumerate(or_keyword_list):
                # gs_sid
                if 'gs_sid' in cand:
                    if cand['gs_sid'] == or_scholar['gs_sid']: 
                        gs_sid_flag = 1

                # domain_labels
                if cand['domain_labels'] is not None:
                    for cand_domain_tag in cand['domain_labels']:
                        cnt_all += 1
                        for or_domain_tag in or_scholar['domain_labels']:
                            if get_str_similarity(cand_domain_tag, or_domain_tag) >= self.similarity_ratio:
                                cnt_true[idx_or_scholar] += 1
                
                
                # relations
                cnt_all_rel = 0
                # print(cand)
                if cand['coauthors'] is not None:
                    for cand_coauth in cand['coauthors']:
                        cnt_all_rel += 1
                        for or_coauth in or_scholar['coauthors']:
                            if get_str_similarity(or_coauth, cand_coauth[1]) >= self.similarity_ratio:
                                cnt_true_rel[idx_or_scholar] += 1
                
            # get the rank list
            # gs_sid
            if gs_sid_flag:
                rank[idx_cand].append(1)
            else:
                rank[idx_cand].append(0)
            
            # domain_labels
            for i in range(len(cnt_true)):
                if cnt_all == 0:
                    cnt_true[i] = 0
                else:
                    cnt_true[i] = cnt_true[i] / cnt_all
            rank[idx_cand].append(max(cnt_true))

            # relations
            for i in range(len(cnt_true_rel)):
                if cnt_all_rel == 0:
                    cnt_true_rel[i] = 0
                else:
                    cnt_true_rel[i] = cnt_true_rel[i] / cnt_all_rel
            rank[idx_cand].append(max(cnt_true_rel))
        
        # select final candidate
        final_idx = []
        for rank_idx in rank:
            if rank[rank_idx][0] == 1:
                final_idx.append(rank_idx)
        
        # TODO: or we can set weights to (relations, domain_tags) to rank the scholar candidates
        if len(final_idx) < top_n:
            domain_tag_rank = []
            relation_rank = []
            for rank_idx in sorted(rank.keys()):
                # print(rank_idx)
                domain_tag_rank.append(rank[rank_idx][1])
                relation_rank.append(rank[rank_idx][2])
            # print(domain_tag_rank, relation_rank)
            domain_tag_idxes = np.argsort(domain_tag_rank)[::-1]
            relation_idxes = np.argsort(relation_rank)[::-1]
            for idx in relation_idxes:
                if relation_rank[idx] == 0:
                    break
                if len(final_idx) < top_n:
                    if idx not in final_idx:
                        final_idx.append(idx)
                else:
                    break
            for idx in domain_tag_idxes:
                if domain_tag_rank[idx] == 0:
                    break
                if len(final_idx) < top_n:
                    if idx not in final_idx:
                        final_idx.append(idx)
                else:
                    break
            if len(final_idx) == 0 and len(rank.keys()) > 0:
                    for rank_idx in sorted(rank.keys()):
                        if len(final_idx) >= top_n:
                            break
                        else:
                            final_idx.append(rank_idx)
        # print(resp)
        # print(or_keyword_list)
        # print(rank)
        # print(final_idx)
        resp = [resp[i] for i in final_idx]
        return resp

    def search_dict(self, query_dict: dict, simple: bool = True, top_n: int = 3):
        """Search candidates given a dictionary.
        
        Parameters
        ----------
        query_dict : default = None. Should be a dict only if using <get_scholar()> class method.
        simple : whether return simple information without paper list. This works only if the argument <field> is not specified.
        top_n : return at most <top_n> scholars if the result is not considered as determined.

        Returns
        -------
        resp : list of candidate scholars, empty if no candidates are found.

        """
        self.search_78k.simple = simple
        # gs_sid
        if 'gscholar' in query_dict['profile']['content'] and 'user=' in query_dict['profile']['content']['gscholar']:
            tmp_gs_sid = query_dict['profile']['content']['gscholar'].split('user=', 1)[1]
            if len(tmp_gs_sid) >= 12:
                gs_sid = tmp_gs_sid[:12]
                name_df = self.search_78k.df.loc[self.search_78k.df['gs_sid'] == gs_sid].copy()
                if name_df.shape[0] != 0:
                    print(f'[Info] Found a scholar using 78k gs_sid')
                    return self.search_78k._deal_with_simple(name_df)
                else:
                    print(f'[Info] Found a scholar using query dict gs_sid')
                    resp = self.search_gs.search_gsid(gs_sid, simple=simple)
                    if len(resp) > 0:
                        return resp
                    
        
        # search_name
        return self.search_name(query_dict['profile']['id'], simple=simple, top_n=top_n, from_dict=True, query_dict=query_dict)

###TwitterSearch - original

In [None]:
import re
import time
from typing import Union
from collections import defaultdict
from selenium import webdriver
from selenium.webdriver.chromium.webdriver import ChromiumDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.errorhandler import WebDriverException
from bs4 import BeautifulSoup

url_search_dict = {
    'google': 'https://www.google.com/search?q={0}'
}


def get_search_result(**kwargs):
    return kwargs


class TwitterSearch(GoogleSearch):
    """Class that handling searching on Google search bar using REST API."""

    def __init__(self, driver_path):
        super().__init__(driver_path)
        self._urlsearch = url_search_dict['google']
        self.print_true = False #NOTE: should be set by users
        # initialize scholar_search object
        self.scholar_search = ScholarSearch()
        self.scholar_search.setup()

    def search_scholar(self, str_type: str, term: str):
        """
        NOTE: now the only allowed str_type is name
        """
        if str_type == 'name':
            scholar_search_result = self.scholar_search.get_scholar(query=term, simple=True, top_n=1, print_true=False)
            branch_type = None
            if len(scholar_search_result) == 0:
                # directly search
                url_fragment = self._urlsearch.format(f'{term} "twitter"')
                result = self._search_google_helper(url_fragment)
                twitter_ids = self.filter_result(result, term, web_source='google')
                branch_type = 'directly search'
            else:
                twitter_ids = None
                # try directly get twitter account through homepage
                if 'homepage_url' in scholar_search_result[0] and scholar_search_result[0]['homepage_url'] is not None:
                    twitter_ids = self._search_twitter_from_homepage(scholar_search_result[0]['homepage_url'], name=term, name_from_gs=scholar_search_result[0]["name"])
                if twitter_ids is not None:
                    branch_type = 'homepage'

                # then try (google_name email_suffix "twitter")
                if "email_info" in scholar_search_result[0] and scholar_search_result[0]["email_info"] != '' and twitter_ids is None:
                    url_fragment = self._urlsearch.format(f'{scholar_search_result[0]["name"]} {scholar_search_result[0]["email_info"]} "twitter"')
                    result = self._search_google_helper(url_fragment)
                    twitter_ids = self.filter_result(result, term, web_source='google')
                if twitter_ids is not None:
                    branch_type = 'name + email'

                # then try (google_name organization "twitter")
                if "organization" in scholar_search_result[0] and scholar_search_result[0]["organization"] != '' and twitter_ids is None:
                    url_fragment = self._urlsearch.format(f'{scholar_search_result[0]["name"]} {scholar_search_result[0]["organization"]} "twitter"')
                    result = self._search_google_helper(url_fragment)
                    twitter_ids = self.filter_result(result, term, web_source='google')
                if twitter_ids is not None:
                    branch_type = 'name + organization'

                # then try (google_name "twitter")
                if twitter_ids is None:
                    url_fragment = self._urlsearch.format(f'{scholar_search_result[0]["name"]} "twitter"')
                    result = self._search_google_helper(url_fragment)
                    twitter_ids = self.filter_result(result, term, web_source='google')
                if twitter_ids is not None:
                    branch_type = 'name'


            print(f'[INFO] branch_type: {branch_type}')
            print(f'[INFO] twitter_ids: {twitter_ids}')

            if twitter_ids is None or len(twitter_ids) == 0:
                return None, twitter_ids
            elif type(twitter_ids) == dict:
                twitter_ids_list = list(twitter_ids.keys())
                highest_occurrence = twitter_ids[twitter_ids_list[0]]
                candidate_list = []
                for item in twitter_ids.items():
                    if item[1] == highest_occurrence:
                        if self._rank_by_similarity(item[0], term, scholar_search_result[0]["name"]) >= 0.15:
                            candidate_list.append(item[0])
                if len(candidate_list) > 0:
                    return self._rank_by_similarity(candidate_list, term, scholar_search_result[0]['name'])[0], twitter_ids
                return None, twitter_ids
            else:
                return twitter_ids[0], twitter_ids

            # TODO: match profile images of google_scholar/homepage with twitter profile images
            # result = self._search_name_helper(term)
            # return self.filter_result(result, term, web_source='google')
        elif str_type == 'gs_url':
            raise NotImplementedError
        else:
            raise NotImplementedError

    def _rank_by_similarity(self, twitter_url_origin_list: Union[list, str], name: str=None, name_from_gs: str=None):
        # process name and name_from_gs
        if name is not None:
            name = re.sub('[0-9_\., ]', '', name.lower())
        if name_from_gs is not None:
            name_from_gs = re.sub('[0-9_\., ]', '', name_from_gs.lower())

        if type(twitter_url_origin_list) == list:
            
            # else
            twitter_url_list = [re.sub('[0-9_\., ]', '', item) for item in twitter_url_origin_list]
            twitter_url_map_dict = {re.sub('[0-9_\., ]', '', item): item for item in twitter_url_origin_list}
            # rank twitter_url_origin_list
            if name is not None and name_from_gs is not None:
                twitter_url_list = sorted(twitter_url_list, key=lambda x: max(get_str_similarity(x, name), get_str_similarity(x, name_from_gs)), reverse=True)
            elif name is not None:
                twitter_url_list = sorted(twitter_url_list, key=lambda x: get_str_similarity(x, name), reverse=True)
            elif name_from_gs is not None:
                twitter_url_list = sorted(twitter_url_list, key=lambda x: get_str_similarity(x, name_from_gs), reverse=True)
            else:
                # do not consider this branch at the moment
                pass
            twitter_url_origin_list = [twitter_url_map_dict[item] for item in twitter_url_list]
            return twitter_url_origin_list
        else:
            twitter_url_origin_str = twitter_url_origin_list
            twitter_url_str = re.sub('[0-9_\., ]', '', twitter_url_origin_str)
            rank = 0
            if name is not None and name_from_gs is not None:
                rank = max(get_str_similarity(twitter_url_str, name), get_str_similarity(twitter_url_origin_list, name_from_gs))
            elif name is not None:
                rank = get_str_similarity(twitter_url_str, name)
            elif name_from_gs is not None:
                rank = get_str_similarity(twitter_url_str, name_from_gs)
            
            return rank

    def _search_twitter_from_homepage(self, homepage_url: str, name: str=None, name_from_gs: str=None):
        # get content of scholar homepage using chromedriver
        try:
            self.driver.get(homepage_url)
        except WebDriverException as e:
            if self.print_true:
                print('[DEBUG] WebDriverException while getting homepage: %s' % homepage_url)
                print(e)
        time.sleep(3)

        page = self.driver.page_source
        soup = BeautifulSoup(page, "html.parser")
        twitter_url_origin_list = list(set([re.findall('twitter.com/([^\/?]+)', item['href'])[0]
            for item in soup.find_all(
                href=re.compile('twitter.com/([^\/?]+)'))]))
        print(soup.find_all(
                href=re.compile('twitter.com/([^\/?]+)')))
        # if there are no candidates for twitter account url, return None
        if len(twitter_url_origin_list) == 0:
            # return soup
            return None

        twitter_url_origin_list = self._rank_by_similarity(twitter_url_origin_list, name=name, name_from_gs=name_from_gs)

        if self.print_true:
            print(f'[DEBUG] Find a set of twitter ids on the provided homepage:\n{twitter_url_origin_list}')
        
        # only return the highest rank twitter account id
        return twitter_url_origin_list

    def _search_google_helper(self, google_url: str):
        self.driver.get(google_url)
        time.sleep(3)

        page = self.driver.page_source
        soup = BeautifulSoup(page, "html.parser")

        result_list = []
        result_block = soup.find_all('div', attrs={'class': 'g'})
        for result in result_block:
            # Find link, title, description
            link = result.find('a', href=True)
            title = result.find('h3')
            description_box = result.find(
                'div', {'style': '-webkit-line-clamp:2'})
            if link and title and description_box:
                result_list.append(get_search_result(
                    href=link['href'], title=title.text, description=description_box.text))
        if self.print_true:
            print(result_list)
        time.sleep(5)
        return result_list
        

    def filter_result(self, result_list, term, web_source):
        """
        web_source: google, twitter
        """
        # sort twitter ids by occurrence frequency
        if web_source == 'google':
            twitter_id_dict = defaultdict(int)
            for result in result_list:
                if 'twitter.com/' in result['href']:
                    twitter_id_dict[re.findall('twitter.com/([^\/?]+)', result['href'])[0]] += 1
        twitter_id_dict = dict(sorted(twitter_id_dict.items(), key=lambda item: item[1], reverse=True))
        # then, sort twitter ids by str similarity?
        # TODO
        # TODO: enter into twitter page to check profile information
        # Step 1: twitter profile vs google scholar profile
        # Step 2: twitter tweets: check whether google scholar domains are in twitter tweets
        # Step 3: twitter profile image (ask Yvonne about the performance)
        
        if len(twitter_id_dict) == 0:
            return None
        else:
            return twitter_id_dict
    
    def search_scholar_batch(self, name_list: list):
        self.result_list = []
        for name in name_list:
            self.result_list.append(self.search_scholar('name', name))
        return self.result_list

    def get_scholar_twitter(self, str_type: str, term: str, only_one=True):
        """
        Final function that search a scholar's twitter account
        # TODO
        """
        raise NotImplementedError
        result = self.search_scholar(str_type=str_type, term=term)

        # first, google web search: name "twitter", get a list of top results, and check whatever name matches exactly
        # if matches, then get the twitter account id, use tweepy API search of the id to get the user profile and do further check

        # if no matches, then search by name directly using Tweepy

        # if there are candidates, do type 1, 2, 3 check of the result
        #

        # '''
        #     The current code and the data is on the folder /cluster/project/sachan/zhiheng/twiteer at Euler server, because of the security reason, I save the twitter key as this structure, and use get_auth.py to load the key in the file. If you need more APIs, please contact me
        #     {
        #     "API_key" : "ilH6jnBJdh9HQdsufmygvUwMB",
        #     "API_secret_key" : "LqErCdWfdP6BWf3LH3Q0RrJAXHoFvmweBUNtI1WljJ2A8SMelW"
        #     }
        #     The current algorithm has the follow steps
        #     Step1: Find all twitter’s screen name by simply search the GS_name on twitter save_twitter_metainfo.py
        #     The problem now is that simply search the GS_name have a low recall rate, which is seen as the current bottleneck, about 52% of valid user loose in this step (see the below information)
        #     Step2: Use match_and_save.py to make a sketch match by the type 1, 2, 3 match and save those users tweets
        #     type 1: matched by personal website
        #     type 2: matched by keyword
        #     type3: matched by similar description with the information in GS
        #     Step3: Process the tweets (not important in current step)
        #     For the current 400 datapoint, there are 136 valid twitter accounts. I can match 20 of them by personal website, 36 of them by using type 1,2,3 match(with FN=20), and only 66 of them appeared in our search by users name (for example, if I simply search "Mohammad Moradi", I can not find the correspondent user moradideli by https://twitter.com/search?q=Mohammad%20Moradi&src=typed_query&f=user).
        #     8:45
        #     Here is some useful info about how a person annotator find the ground truth twitter user:
        #     8:46
        #     I followed the instruction in this doc by searching the name + Twitter in the Google first, and click top results to see if there is any match. If none, I will go search the name in Twitter and also browse through the top results. Sometimes I will also search their LinkedIn page to get their most up-to-date information. (the current institute in Google Scholar is not as accurate as their LinkedIn, and LinkedIn has a full history of where they worked. Moreover, they tend to put their photos in LinkedIn)
        # '''


# literature:

# https://direct.mit.edu/qss/article/1/2/771/96149/Large-scale-identification-and-characterization-of

# Test

In [None]:
ts = TwitterSearch(driver_path)

  options.headless = True
  self.driver = webdriver.Chrome(driver_path, options=options)


In [None]:
with open("/content/drive/My Drive/tweets-dataset/gs_scholars_matched_with_twitter_accounts_500.tsv", "r") as fr, open('/content/drive/My Drive/tweets-dataset/gs_scholars_candidate_twitter_accounts_v0_original_algo.tsv','a') as fw:
  reader = csv.reader(fr, delimiter="\t")
  fw.write('\n')
  num_times = 0
  for i, line in tqdm(enumerate(reader)):
    index = line[0]
    if i==0 or int(index)<=477:
      continue
    name = line[1]
    org = line[2]
    gs_url = line[3]
    twitter_url = line[4]
    if i>0:
      # get twitter IDs
      twitter_id, candidate_ids = ts.search_scholar('name', name)
      num_times += 1
      if twitter_id is not None:
        # save list of twitter IDs in the CSV
        line.append(twitter_id)
      else:
        line.append("N/A")
        time.sleep(random.randint(2, 4))
        print(i)
      if candidate_ids is not None:
        if type(candidate_ids) == dict:
          line.extend(list(candidate_ids.keys()))
        else:
          line.extend(candidate_ids)
    fw.write("\t".join(line))
    fw.write('\n')
    if num_times >= 10:
      num_times = 0
      del ts
      ts = TwitterSearch(driver_path)
      time.sleep(random.randint(4, 8))

# Error Analysis

In [1]:
from pandas.core.internals.managers import ensure_block_shape
gs_link_prefix = "https://scholar.google.com/citations?user=" 
twitter_link_prefix = "https://twitter.com/"

twitter_id_in_top_10 = 0
twitter_id_in_top_1 = 0
twitter_id_with_no_candidates = 0
twitter_id_not_in_candidates = 0
null_id_with_no_candidates = 0
null_id_with_candidates = 0

indices_not_found = []

with open("/content/drive/My Drive/tweets-dataset/gs_scholars_candidate_twitter_accounts_v0_original_algo.tsv", "r") as f:
  reader = csv.reader(f, delimiter="\t")
  for i, line in enumerate(reader):
    if i==0:
      continue
    index = line[0]
    name = line[1]
    org = line[2]
    gs_url = line[3]
    twitter_url = line[5]
    top_1 = line[6].lower() if line[6] != "N/A" and len(line[6]) > 0 else None
    top_10 = [id.lower() for id in line[7:] if len(id)>0]
    
    if twitter_url=="N/A":
      if len(top_10) == 0:
        null_id_with_no_candidates += 1
      else:
        null_id_with_candidates += 1
    else:
      twitter_id = twitter_url.split("twitter.com/")[-1]
      if len(top_10) == 0:
        twitter_id_with_no_candidates += 1
      else:
        if twitter_id.lower() in top_10:
          twitter_id_in_top_10 += 1
          if twitter_id.lower() == top_1:
            twitter_id_in_top_1 += 1
        else:
          twitter_id_not_in_candidates += 1

FileNotFoundError: ignored

In [None]:
print(f'twitter_id_in_top_10 :          {twitter_id_in_top_10}')
print(f'twitter_id_in_top_1 :           {twitter_id_in_top_1}')
print(f'twitter_id_with_no_candidates : {twitter_id_with_no_candidates}')
print(f'twitter_id_not_in_candidates :  {twitter_id_not_in_candidates}')
print(f'null_id_with_no_candidates :    {null_id_with_no_candidates}')
print(f'null_id_with_candidates :       {null_id_with_candidates}')

In [None]:
precision_at_10 = twitter_id_in_top_10 / (twitter_id_in_top_10 + twitter_id_not_in_candidates + null_id_with_candidates)
precision_at_10

In [None]:
recall_at_10 = twitter_id_in_top_10 / (twitter_id_in_top_10 + twitter_id_with_no_candidates + twitter_id_not_in_candidates)
recall_at_10

In [None]:
f_at_10 = (2*precision_at_10*recall_at_10) / (precision_at_10 + recall_at_10)
f_at_10

In [None]:
precision_at_1 = twitter_id_in_top_1 / (twitter_id_in_top_10 + twitter_id_not_in_candidates + null_id_with_candidates)
precision_at_1

In [None]:
recall_at_1 = twitter_id_in_top_1 / (twitter_id_in_top_10 + twitter_id_with_no_candidates + twitter_id_not_in_candidates)
recall_at_1

In [None]:
f_at_1 = (2*precision_at_1*recall_at_1) / (precision_at_1 + recall_at_1)
f_at_1