In [None]:
%%capture
%pip install keybert
%pip install transformers

In [None]:
import requests
import pandas as pd
from bs4 import BeautifulSoup
import numpy as np
import random
from keybert import KeyBERT
from transformers import pipeline

In [None]:
# Set k (number of non-base patents per row)
PATENT_COUNT_PER_ROW = 25

In [None]:
def get_base_patents(query_terms):
  """Query USPTO Bulk Data API for base patent (patent application) metadata based on keywords or phrases.

    ## Parameters:
    query_terms (list): List of keywords and phrases to query for.

    ## Returns:
    df (DataFrame): DataFrame with application number, publication number, abstract, and claims of retrieved base patents. 
  """
  
  df = pd.DataFrame()
  for query_term in query_terms:
    start = 0
    # Format query for URL
    query_term = '%22' + query_term + '%22'
    query_term.replace(' ', '%20')

    # Query USPTO Bulk Data API initially to ensure valid query (if result_count > 0)
    bulk_data_api_url = f"https://developer.uspto.gov/ibd-api/v1/application/publications?searchText={query_term}&start={start}&largeTextSearchFlag=Y"
    bulk_search = requests.get(bulk_data_api_url).json()

    # Get all query results
    result_count = bulk_search['recordTotalQuantity']
    l = []
    while start < result_count:
      # Query USPTO Bulk Data API for data
      bulk_data_api_url = f"https://developer.uspto.gov/ibd-api/v1/application/publications?searchText={query_term}&start={start}&largeTextSearchFlag=Y"
      bulk_search = requests.get(bulk_data_api_url).json()
      bulk_search_results = bulk_search['results']

      # Extract relevant patent metadata from results
      for result in bulk_search_results:
        d = {}
        d['App_Number'] = result['patentApplicationNumber']
        d['Base_Pub_Number'] = result['publicationDocumentIdentifier']
        d['Base_Abstract'] = result['abstractText'][0]

        try:
          d['Base_Claims'] = result['claimText'][0]
        except:
          d['Base_Claims'] = result['claimText']

        l.append(d)

      # Move to next page of query results
      start += 100

      # Cap base patent count at 10000 due to performance issues
      if start > 10000:
        break

    temp_df = pd.DataFrame(l)
    # Drop rows with null claims
    temp_df = temp_df[temp_df['Base_Claims'].notnull()]
    df = pd.concat([df, temp_df])

  df.reset_index(drop=True, inplace=True)
  return df

In [None]:
### Helper functions and entities for publication number extraction ###

# Load T5 model for query number extraction
text2text_generator = pipeline("text2text-generation")

# Define common country and kind codes for publication numbers
# TODO: Add more to reduce data loss?
COMMON_COUNTRY_CODES = ['US', 'JP', 'EP', 'WO', 'CN']
COMMON_KIND_CODES = ['A', 'A1', 'A2', 'B1', 'B2']

def clean_pub_num(n, debug=False):
  """Clean extracted publication number to match typical format.

    ## Parameters:
    n (str): Extracted publication number string.
    debug (bool): Flag denoting whether or not to print logical errors.

    ## Returns:
    temp_n (str): Cleaned publication number string.
  """

  # Remove redundant characters
  n = n.replace('/', '')
  n = n.replace(',', '')
  n = n.replace('-', '')

  # Split on spaces
  spl_n = n.split(' ')
  temp_n = ''
  for spl in spl_n:
    # Check if chunk is prefix, suffix, or publication number body
    if spl in COMMON_COUNTRY_CODES or spl.isnumeric() or spl in COMMON_KIND_CODES:
      temp_n += spl
    # Check if chunk is publication number body with suffix attached
    elif (spl[:-1].isnumeric() and spl[-1] in COMMON_KIND_CODES) or (spl[:-2].isnumeric() and spl[-2:] in COMMON_KIND_CODES):
      temp_n += spl

  # Identify valid publication numbers based on common lengths
  if len(temp_n) < 7 or len(temp_n) > 15:
    if debug:
      print(temp_n)
    return np.nan
  else:
    return temp_n

def clean_whitespace(s):
  """Eliminate common examples of whitespace from string.

    ## Parameters:
    s (str): String to remove whitespace from.

    ## Returns:
    s (str): String with whitespace removed.
  """

  s = s.replace('  \n', '')
  s = s.replace('\n', '')
  s = s.replace('  ', ' ')
  return s

def is_valid_char(c):
  """Eliminate non-standard (non-English) unicode characters from string.

    ## Parameters:
    c (char): Character to check validity of.

    ## Returns:
    bool (bool): Validity status of character.
  """

  uni_c = ord(c)
  return uni_c > 31 and uni_c < 127

def scrape_google_patents(pub_num, debug=False):
  """Query Google Patents based on extracted publication number and web scrape corresponding claims.

    ## Parameters:
    pub_num (str): Cleaned publication number cited in USPTO office action rejection.
    debug (bool): Flag denoting whether or not to print logical errors.

    ## Returns:
    temp_num (DataFrame): Potentially modified publication number with added prefix.
    claims (str): Rejection claims scraped from Google Patents.
  """

  # Add no prefix in case it is already part of extracted publication number
  country_codes = [''] + COMMON_COUNTRY_CODES
  for country_code in country_codes:
      # Try publication number with added prefix
      temp_num = country_code + pub_num
      url = f"https://patents.google.com/patent/{temp_num}/en"
      page = requests.get(url)

      if page.status_code == 404:
        continue
      else:
        parser = BeautifulSoup(page.content, "html.parser")
        try:
          # Find claims text
          claims = parser.find('section', itemprop='claims').text
          # Clean text
          claims = clean_whitespace(''.join(filter(is_valid_char, claims)))
          return temp_num, claims[claims.find(')')+1:].strip()
        except:
          if debug:
            print(pub_num)
          return np.nan, np.nan

  if debug:
    print(pub_num)
  return np.nan, np.nan

In [None]:
def get_positive_samples(df):
  """Query USPTO Office Action API with base patent application numbers. Extract cited publication numbers from 102 rejection
    text and scrape Google Patents for corresponding claims (positive samples).

    ## Parameters:
    df (DataFrame): DataFrame with base patent metadata.

    ## Returns:
    df (DataFrame): DataFrame with added rejection metadata.
  """
  
  # Set up USPTO Office Action API query
  oa_api_url = "https://developer.uspto.gov/ds-api/oa_actions/v1/records"
  headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'}

  for i, r in df.iterrows():
    # Format query for API call
    app_num = f"{r['App_Number'][2:]}"
    app_num = '"' + app_num + '"'
    # Query USPTO Office Action API for data
    data = {'criteria': f'patentApplicationNumber:{app_num}', 'start': '0', 'rows': '100'}
    oa_search = requests.post(oa_api_url, headers=headers, data=data).json()
    oa_response = oa_search['response']

    if oa_response['numFound'] > 0:
      j = 1
      rej_nums = []
      for oa in oa_response['docs']:
        # Access 102 rejection text, if applicable
        if 'sections.section102RejectionText' in oa.keys():
          rej_text = oa['sections.section102RejectionText']
          if rej_text == None:
            continue

          rej_text = rej_text[0][:250]
          # Pass rejection text to T5 model for cited publication number extraction
          extract_num = text2text_generator(f"question: What is the anticpated patent's publication number with country and kind code? context: {rej_text}")[0]['generated_text']
          # Format extracted publication number
          rej_num = clean_pub_num(extract_num, debug=False)
          if rej_num is not np.nan and rej_num not in rej_nums:
            # Web scrape Google Patents for claims
            scrape_res = scrape_google_patents(rej_num, debug=True)
            if scrape_res[0] is not np.nan:
              df.loc[i, f'Rej_Pub_Number_{j}'], df.loc[i, f'Rej_Claims_{j}'] = scrape_res
              j += 1
              rej_nums.append(rej_num)

  return df

In [None]:
### Helper functions for negative sample acquisition ###

# Initialize keyword extraction model
kw_model = KeyBERT()

def get_tot_result_count(term):
  """Determine number of patents with specified keyword or phrase.

    ## Parameters:
    term (str): String to check result count for.

    ## Returns:
    tot_result_count (int): Result count for string.
  """

  # Format query for URL
  query_term = '%22' + term.lower() + '%22'
  query_term.replace(' ', '%20')

  bulk_data_api_url = f"https://developer.uspto.gov/ibd-api/v1/application/publications?searchText={query_term}&rows=1&largeTextSearchFlag=Y"
  keyword_search = requests.get(bulk_data_api_url).json()
  tot_result_count = keyword_search['recordTotalQuantity']

  return tot_result_count

def allocate_term_results(top_terms, tgt_patent_count, debug=False):
  """Allocate number of results to pull for each keyword or phrase based on availability. Shift per term result count
     dynamically in order to guarantee k results per row. 

    ## Parameters:
    top_terms (DataFrame): List of keywords to allocate result counts to.
    tgt_patent_count (int): Number of patents per row minus number of rejection patents already obtained for row.
    debug (bool): Flag denoting whether or not to print logical errors.

    ## Returns:
    term_result_alloc_dict (dict): Dictionary with terms as keys and number of results to pull per term as values.
  """

  # Allocate number of results needed evenly per term to start
  even_num_results = tgt_patent_count // len(top_terms)
  # Get total available result count per term and format as dict
  term_result_alloc_dict = {term: [get_tot_result_count(term), even_num_results] for term in top_terms}
  # Add remainder of even distribution to first term
  term_result_alloc_dict[top_terms[0]][1] += tgt_patent_count % len(top_terms)

  # Determine term to default to when allocations exceeds availability for any given term
  master_key = 0
  # Store terms that lack enough available results
  invalid_keys = []
  for i, (tot_result_count, num_results) in enumerate(term_result_alloc_dict.values()):
    if debug:
      print(tot_result_count, num_results)
      print(term_result_alloc_dict)

    # Check if availability can support allocation
    # Build in 2x buffer for allocation count in case of occasional null claims
    if tot_result_count <= 2*num_results:
      # Test if master key can support allocation of additional terms 
      temp_num_results = term_result_alloc_dict[top_terms[master_key]][1] + num_results
      temp_tot_result = term_result_alloc_dict[top_terms[master_key]][0]
      while temp_tot_result <= 2*temp_num_results:
        # Change master key if new allocation exceeds availablity
        master_key += 1
        if master_key == len(top_terms):
          # Return empty dict when all terms fail to support total allocation
          return dict()
        temp_num_results = term_result_alloc_dict[top_terms[master_key]][1] + num_results
        temp_tot_result = term_result_alloc_dict[top_terms[master_key]][0]

      # Shift allocation of results away from invalid key to master key
      term_result_alloc_dict[top_terms[master_key]][1] = temp_num_results
      # Mark key as invalid if it cannot support allocation
      invalid_keys.append(i)

  # Remove invalid keys
  for key in invalid_keys:
    del term_result_alloc_dict[top_terms[key]]

  return term_result_alloc_dict

In [None]:
def get_negative_samples(df, verbose=True):
  """Extract keywords from base patent abstracts and use to query USPTO Bulk Data API for claims of patents relevant 
     to base patents (negative samples).
  
    ## Parameters:
    df (DataFrame): DataFrame with base patent and rejection metadata.
    verbose (bool): Flag denoting whether or not to print progress markers and rows without k patents.

    ## Returns:
    df (DataFrame): DataFrame with added non-rejection, relevant patent metadata.
  """

  for i, r in df.iterrows():
    # Extract keywords and phrases
    top_terms = [term for term, score in kw_model.extract_keywords(r['Base_Abstract'], keyphrase_ngram_range=(1, 2))]
    # Determine number of negative patents needed to hit k patents per row
    rej_count = len(rej_cols) - r[rej_cols].isna().sum()
    tgt_patent_count = PATENT_COUNT_PER_ROW - rej_count

    # Allocate result counts per term
    term_result_alloc_dict = allocate_term_results(top_terms, tgt_patent_count)

    l = []
    for term, (tot_result_count, num_results) in term_result_alloc_dict.items():
      # Format query for URL
      term = '%22' + term.lower() + '%22'
      term.replace(' ', '%20')

      # Choose random starting point to avoid repeat data
      start = random.randint(0, (tot_result_count - 2*num_results - 1))
      # Query USPTO Bulk Data API for data
      bulk_data_api_url = f"https://developer.uspto.gov/ibd-api/v1/application/publications?searchText={term}&start={start}&largeTextSearchFlag=Y"
      keyword_search = requests.get(bulk_data_api_url).json()

      keyword_search_results = keyword_search['results']
      j = 0
      for result in keyword_search_results:
        try:
          # Ignore result if duplicate result for row
          pub_num = result['publicationDocumentIdentifier']
          if pub_num == r['Base_Pub_Number'] or pub_num in l:
            continue
          
          # Obtain claims text
          claims = result['claimText'][0]
          l.extend([pub_num, claims])
          j += 1
          if j == num_results:
            break
        except:
          continue

      # Alert upon failure to acquire enough negative samples
      if verbose and j != num_results:
        print(f"Retrieval error at row {i}")

      # Set appropriate columns as null to ensure exactly k patents per row
      if len(l)/2 == tgt_patent_count:
        if len(l)/2 != PATENT_COUNT_PER_ROW:
          diff = (PATENT_COUNT_PER_ROW * 2) - len(l)
          l.extend([np.nan]*diff)
        df.loc[i, non_rej_cols] = l
    
    # Show completion progress
    if verbose:
      print(i)

  return df

In [None]:
df = get_base_patents(['artificial intelligence'])

# Store positive and negative sample data column names
rej_cols = [col for col in list(df.columns) if col[:-1] == 'Rej_Claims_']
non_rej_cols = []
for i in range(1, PATENT_COUNT_PER_ROW+1):
  non_rej_cols.extend([f'Pub_Number_{i}', f'Claims_{i}'])

df = get_positive_samples(df)
df = get_negative_samples(df)

In [None]:
df.to_csv('dataset.csv')