In [1]:
import math
import requests
import time
import backoff
import re
import os
import json
import configparser
import logging
from authlib.integrations.requests_client import OAuth2Session, OAuth2Auth

os.chdir('..')

# Configure logger
formatter = logging.Formatter(
    "%(asctime)s.%(msecs)03d %(levelname)s - %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)

# Read in configuration variables
parser = configparser.ConfigParser()
parser.read("pipeline.conf")

#
# Constants
#
COMPLETE_SEARCHES_PATH = "data/dispute_searches-complete.json"
INCOMPLETE_SEARCHES_PATH = "data/dispute_searches-incomplete.json"
BOOKMARKS_PATH = "data/bookmarks.json"
MAX_REQUEST_PER_DISPUTE = None
MAX_REQUEST_PER_DAY = 1000

#
# Nexis Uni API Utility Classes
#


class Config:
    """
    Class to store the Nexis Uni API authentication credentials and any other
    universal configuration parameters.
    """

    # Nexis Uni credentials
    CLIENT_ID = parser.get("nu_api_credentials", "CLIENT_ID")
    CLIENT_SECRET = parser.get("nu_api_credentials", "SECRET")
    SCOPE = "http://oauth.lexisnexis.com/all"


class Collections:
    """
    URLs for different API collections.

    Currently only news and batch news included. More can be added as needed
    """

    BaseURL = "https://services-api.lexisnexis.com/v1/"
    News = BaseURL + "News"
    BatchNews = BaseURL + "BatchNews"

    def __iter__(self):
        class_vars = {
            k: getattr(cls, k)
            for k in dir(cls)
            if not callable(getattr(cls, k)) and not k.startswith("__")
        }
        for item in class_vars.items():
            yield item

    def __str__(self):
        printout = ""
        for collection, url in cls:
            printout += f"{collection}: {url}" + "\n"
        return printout


class Search:
    def __init__(
        self,
        client,
        collection,
        search_str,
        params=None,
        verbose=False,
        checkpoint=None,
    ):
        # Instance variables
        self.client = client
        self.payload = {"$search": search_str}
        if params:
            self.payload = dict(self.payload, **params)
        self.collection = collection
        self.checkpoint = checkpoint

        # Fetch init results and update metadata
        self._get_init_results()

        # Check search
        if self._raw_results.status_code == requests.codes.ok:
            self._update_data()

        if verbose:
            self._print_params()

    def _get_init_results(self):
        """Fetch initial search results.

        returns
        -------
        r: an HTTP request response, same as requests library
        json_results: the response converted to json format
        """
        # Run get request, update metadata, _next_page, _prev_page
        try:
            if self.checkpoint:
                r = self.client.get(self.checkpoint)
            else:
                r = self.client.get(self.collection, params=self.payload)
            self._raw_results = r
            self._json_results = r.json()
            if "@odata.count" not in self._json_results.keys():
                self.n_pages = 0
            else:
                self.n_results = self._json_results["@odata.count"]
                self.n_pages = math.ceil(
                    self.n_results / len(self._json_results["value"])
                )
        except Exception as e:
            logger.error(f"Error encountered: {e}")

    def _update_data(self):
        """Store initial results."""
        self._next_link = (
            self._json_results["@odata.nextLink"]
            if "@odata.nextLink" in self._json_results.keys()
            else None
        )
        self._values = (
            self._json_results["value"]
            if "value" in self._json_results.keys()
            else None
        )

    def _print_params(self):
        """Print search params."""
        logger.info("Lexis Nexis API Search Parameters")
        logger.info(f"Collection: {self.collection}")
        for k, v in self.payload.items():
            logger.info(f"{k}: {v}")

    @property
    def results(self):
        return self._values

    def next_page(self):
        if not self._next_link:
            logger.info("Already on last page.")
            return
        self._raw_results = self.client.get(self._next_link)
        if self._raw_results.status_code == requests.codes.ok:
            self._json_results = self._raw_results.json()
            self._update_data()
        else:
            raise requests.HTTPError(self._raw_results)
        return self.results

    def __iter__(self):
        # Not final page of results,
        # retrieve results,
        # fetch next page,
        # yield results, repeat
        i = 1
        while self._next_link:
            results = self.results
            if i % 3 == 0:
                raise Exception("Hard coded error")
            self.next_page()
            i +=1
            yield (results, self._next_link)
        # Last page already retrieved, yield results
        yield (self.results, None)


class OAuth2SessionBackoff(OAuth2Session):
    def __init__(
        self,
        client_id=None,
        client_secret=None,
        token_endpoint_auth_method=None,
        revocation_endpoint_auth_method=None,
        scope=None,
        redirect_uri=None,
        token=None,
        token_placement="header",
        update_token=None,
        **kwargs,
    ):
        OAuth2Session.__init__(
            self,
            client_id=client_id,
            client_secret=client_secret,
            token_endpoint_auth_method=token_endpoint_auth_method,
            revocation_endpoint_auth_method=revocation_endpoint_auth_method,
            scope=scope,
            redirect_uri=redirect_uri,
            token=token,
            token_placement=token_placement,
            update_token=update_token,
            **kwargs,
        )

    @backoff.on_exception(
        backoff.expo, requests.exceptions.RequestException, max_tries=8
    )
    def backoff_get(self, *args, **kwargs):
        return self.get(*args, **kwargs)


class API:
    """
    OAuth2 Client Session for connecting to Lexis Nexis API.

    params        # Replace client get with backoff get
        self.client.get = backoff_get
    ------
    path_to_env, str: absolute path to an .env file containing
        the OAuth Client ID and Client Secret
    """

    def __init__(self, config):
        self.client = self.create_client(config=config)
        self._fetch_token()
        self.collections = Collections()

    def create_client(self, config):
        return OAuth2SessionBackoff(
            config.CLIENT_ID, config.CLIENT_SECRET, scope=config.SCOPE
        )

    def _fetch_token(self):
        logger.info("Fetching token...")
        token_endpoint = "https://auth-api.lexisnexis.com/oauth/v2/token"
        self._token = self.client.fetch_token(
            token_endpoint, grant_type="client_credentials"
        )
        logger.info("Success")

    @property
    def token(self):
        """Returns LexisNexis API Client Access Token."""
        if self.token_status == "Expired":
            self._fetch_token()
        return self._token["access_token"]

    @property
    def token_status(self):
        """Returns token Status."""
        self._token_status = "Valid"
        if self._token["expires_at"] < time.time():
            self._token_status = "Expired"
        return self._token_status

    @property
    def token_metadata(self):
        """Returns token metadata."""
        self._token_metadata = {
            k: v for k, v in self._token.items() if k != "access_token"
        }
        return self._token_metadata


#
# API Call Helper Functions
#


def search_dict_to_filename(search_dict):
    assert set(["name", "search_str", "date", "n_requests"]) == set(
        search_dict.keys()
    ), "search dict missing required arguments"
    date_str = re.sub("\s", "_", search_dict["date"])
    dispute_name = search_dict["name"]
    return f"data/nu-api-data-raw/{dispute_name}?{date_str}.json"


def load_json_list(path):
    logger.info(f"Loading json list at {path}")
    with open(path, "rb") as infile:
        json_list = json.load(infile)
    return json_list


def save_json(path, data):
    logger.info(f"Saving json to {path}")
    with open(path, "w", encoding="utf-8") as outfile:
        json.dump(data, outfile, ensure_ascii=False)


def search_and_concatenate(
    api_client,
    collection,
    search_str,
    top=50,
    filter=None,
    checkpoint=None,
    page_limit=None,
):
    """
    Utility class for performing an API search on the specified collection with the
    given parameters. Iterates through all pages of the search and concatenates into
    a pandas dataframe.

    Params
    ------
    api_client: class method, API client method from the API class
    collection: class variable, A NexisUni collection endpoint from the Collections class
    search_str: str, the api search query
    top: int, number of results to return per page. (Default = 50, max = 50)
    filter: str, optional, additional query filters. Must be in NexisUni approved format.
    checkpoint: str, optional, a url request to hot start a search in the event of prior failure.
    page_limit: int, optional, a limit on the number of pages to iterate through before stopping.

    Return
    ------
    all_results: list, dicts/json objects
    fetch_incomplete: boolean, whether the function successfully retrieved all pages of results
    """
    all_results = []
    n_requests = 0

    # Search
    payload = {"$search": search_str, "$top": top, "$expand": "Document"}
    if filter:
        payload["$filter"] = filter
    s = Search(
        client=api_client,
        collection=collection,
        search_str=search_str,
        params=payload,
        verbose=True,
        checkpoint=checkpoint,
    )

    # Find starting page number on hot start
    start_page = (
        int(re.findall(r"\$skip=(\d+)?(?=&)", checkpoint)[0]) // 50 if checkpoint else 1
    )

    # Exit early for no results
    if s.n_pages < 1:
        return all_results, None, n_requests

    # Iterate through all results and combine in a single list
    logger.info("Concatenating results...")
    next_link = s._raw_results.request.url  # In case initial call fails
    try:
        for i, payload in enumerate(s):
            n_requests = i + 1
            logger.info(f"Page: {i + start_page} / {s.n_pages}")
            page, next_link = payload
            all_results.extend(page)

            # Exit early if limit reached
            if page_limit and n_requests == page_limit:
                logger.info(
                    f"Request limit of {page_limit} reached. Stopping iteration."
                )
                break

            time.sleep(18)  # Sleep request so as not to hit hourly rate limit
    except Exception as e:
        logger.error(f"Encountered exception when fetching data: {e}")
        # store file name as job bookmark
        return all_results, next_link, n_requests
    else:
        # Check all results captured on success
        if len(all_results) != s.n_results:
            logger.warning("Did not successfully concatenate all results.")
            logger.warning(
                f"\tOnly {len(all_results)} / {s.n_results} successfully fetched."
            )
        return all_results, next_link, n_requests

In [2]:
# Load search args
incomplete = load_json_list('data/dispute_searches-incomplete.json')
params = incomplete[-1]
filename = search_dict_to_filename(search_dict=params)


2022-07-16 12:31:36,788.788 INFO - __main__: Loading json list at data/dispute_searches-incomplete.json


In [3]:
# Instantiate API
api = API(config=Config)

2022-07-16 12:31:38,309.309 INFO - __main__: Fetching token...
2022-07-16 12:31:38,701.701 INFO - __main__: Success


In [4]:
all_results = []
n_requests = 0
page_limit = 1000
checkpoint = None

# Search
payload = {"$search": params["search_str"], "$top": 50, "$expand": "Document"}
if filter:
    payload["$filter"] = params["date"]
s = Search(
    client=api.client,
    collection=Collections.BatchNews,
    search_str=params["search_str"],
    params=payload,
    verbose=True,
    checkpoint=None,
)

start_page = (
        int(re.findall(r"\$skip=(\d+)?(?=&)", checkpoint)[0]) // 50 if checkpoint else 1
    )

logger.info("Concatenating results...")
next_link = s._raw_results.request.url  # In case initial call fails
try:
    for i, payload in enumerate(s):
        n_requests = i + 1
        print(f"Page: {i + start_page} / {s.n_pages}")
        page, next_link = payload
        print(f"Next link: {next_link}")
        all_results.extend(page)

        # Exit early if limit reached
        if page_limit and n_requests == page_limit:
            logger.info(
                f"Request limit of {page_limit} reached. Stopping iteration."
            )
            break

        time.sleep(18)  # Sleep request so as not to hit hourly rate limit
except Exception as e:
    logger.error(f"Encountered exception when fetching data: {e}")
    # store file name as job bookmark
    logger.info(all_results)
    logger.info(f"next_link: {next_link}")
    logger.info(f"n_requests: {n_requests}")
else:
    # Check all results captured on success
    if len(all_results) != s.n_results:
        logger.warning("Did not successfully concatenate all results.")
        logger.warning(
            f"\tOnly {len(all_results)} / {s.n_results} successfully fetched."
        )
    logger.info(all_results)
    logger.info(f"next_link: {next_link}")
    logger.info(f"n_requests: {n_requests}")

2022-07-16 12:31:48,494.494 INFO - __main__: Lexis Nexis API Search Parameters
2022-07-16 12:31:48,495.495 INFO - __main__: Collection: https://services-api.lexisnexis.com/v1/BatchNews
2022-07-16 12:31:48,495.495 INFO - __main__: $search: (United Kingdom) w/seg [Scot*] w/seg [mediation OR talks OR meeting OR visit OR envoy OR “shuttle diplomacy” OR “good offices” OR ceasefire OR settlement OR “peace agreement” OR “special representative” OR representative OR delegat* OR diploma* OR ambassador OR ”peace deal” OR dialogue OR fact-finding OR “fact finding” OR bilateral OR multilateral OR multiparty OR resolution OR “round table” OR round-table OR workshop]
2022-07-16 12:31:48,496.496 INFO - __main__: $top: 50
2022-07-16 12:31:48,497.497 INFO - __main__: $expand: Document
2022-07-16 12:31:48,497.497 INFO - __main__: $filter: Date gt 1991-01-01 and Date lt 2015-12-31
2022-07-16 12:31:48,498.498 INFO - __main__: Concatenating results...


Page: 1 / 1036
Next link: https://services-api.lexisnexis.com/v1/BatchNews?$top=50&$expand=Document,Source&$filter=Date%20gt%201991-01-01%20and%20Date%20lt%202015-12-31&$select=ResultId&$skip=100&$search=(United+Kingdom)+w%2fseg+%5bScot*%5d+w%2fseg+%5bmediation+OR+talks+OR+meeting+OR+visit+OR+envoy+OR+%E2%80%9Cshuttle+diplomacy%E2%80%9D+OR+%E2%80%9Cgood+offices%E2%80%9D+OR+ceasefire+OR+settlement+OR+%E2%80%9Cpeace+agreement%E2%80%9D+OR+%E2%80%9Cspecial+representative%E2%80%9D+OR+representative+OR+delegat*+OR+diploma*+OR+ambassador+OR+%E2%80%9Dpeace+deal%E2%80%9D+OR+dialogue+OR+fact-finding+OR+%E2%80%9Cfact+finding%E2%80%9D+OR+bilateral+OR+multilateral+OR+multiparty+OR+resolution+OR+%E2%80%9Cround+table%E2%80%9D+OR+round-table+OR+workshop%5d
Page: 2 / 1036
Next link: https://services-api.lexisnexis.com/v1/BatchNews?$top=50&$expand=Document,Source&$filter=Date%20gt%201991-01-01%20and%20Date%20lt%202015-12-31&$select=ResultId&$skip=150&$search=(United+Kingdom)+w%2fseg+%5bScot*%5d+w%2fseg+

2022-07-16 12:32:28,297.297 ERROR - __main__: Encountered exception when fetching data: Hard coded error
2022-07-16 12:32:28,317.317 INFO - __main__: next_link: https://services-api.lexisnexis.com/v1/BatchNews?$top=50&$expand=Document,Source&$filter=Date%20gt%201991-01-01%20and%20Date%20lt%202015-12-31&$select=ResultId&$skip=150&$search=(United+Kingdom)+w%2fseg+%5bScot*%5d+w%2fseg+%5bmediation+OR+talks+OR+meeting+OR+visit+OR+envoy+OR+%E2%80%9Cshuttle+diplomacy%E2%80%9D+OR+%E2%80%9Cgood+offices%E2%80%9D+OR+ceasefire+OR+settlement+OR+%E2%80%9Cpeace+agreement%E2%80%9D+OR+%E2%80%9Cspecial+representative%E2%80%9D+OR+representative+OR+delegat*+OR+diploma*+OR+ambassador+OR+%E2%80%9Dpeace+deal%E2%80%9D+OR+dialogue+OR+fact-finding+OR+%E2%80%9Cfact+finding%E2%80%9D+OR+bilateral+OR+multilateral+OR+multiparty+OR+resolution+OR+%E2%80%9Cround+table%E2%80%9D+OR+round-table+OR+workshop%5d
2022-07-16 12:32:28,318.318 INFO - __main__: n_requests: 2


In [10]:
s._next_link

'https://services-api.lexisnexis.com/v1/BatchNews?$top=50&$expand=Document,Source&$filter=Date%20gt%201991-01-01%20and%20Date%20lt%202015-12-31&$select=ResultId&$skip=100&$search=(United+Kingdom)+w%2fseg+%5bScot*%5d+w%2fseg+%5bmediation+OR+talks+OR+meeting+OR+visit+OR+envoy+OR+%E2%80%9Cshuttle+diplomacy%E2%80%9D+OR+%E2%80%9Cgood+offices%E2%80%9D+OR+ceasefire+OR+settlement+OR+%E2%80%9Cpeace+agreement%E2%80%9D+OR+%E2%80%9Cspecial+representative%E2%80%9D+OR+representative+OR+delegat*+OR+diploma*+OR+ambassador+OR+%E2%80%9Dpeace+deal%E2%80%9D+OR+dialogue+OR+fact-finding+OR+%E2%80%9Cfact+finding%E2%80%9D+OR+bilateral+OR+multilateral+OR+multiparty+OR+resolution+OR+%E2%80%9Cround+table%E2%80%9D+OR+round-table+OR+workshop%5d'

In [15]:
requests.HTTPError?

[0;31mInit signature:[0m [0mrequests[0m[0;34m.[0m[0mHTTPError[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m      An HTTP error occurred.
[0;31mInit docstring:[0m Initialize RequestException with `request` and `response` objects.
[0;31mFile:[0m           ~/Documents/projects/srdp/SRDP/API/venv/lib/python3.7/site-packages/requests/exceptions.py
[0;31mType:[0m           type
[0;31mSubclasses:[0m     
