In [2]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
from requests.auth import HTTPBasicAuth
import random 
import time
import math

In [3]:
consumer_key = 'Lq3V1hAxII2Fv8YSdcrhfA538HzhsfzhelYqSUgzGU0A8AEU'
consumer_secret_key = 'jHJjRSzVQBjCizDYmFg4cqWza2R8TOCcLaecRbXrJQcseLu8Lr0AqYXL1ldkAx8A'

# EPO OAuth2 token URL
token_url = "https://ops.epo.org/3.2/auth/accesstoken"

def get_token():
    # Request access token
    response = requests.post(
        token_url,
        auth=HTTPBasicAuth(consumer_key, consumer_secret_key),
        data={"grant_type": "client_credentials"},
    )
    
    # Extract token
    if response.status_code == 200:
        token = response.json()["access_token"]
        return token
    else:
        print("Failed to retrieve access token.")
        return None

In [4]:
#patent search without countries for faster collection (used up until 1866)
def patent_search(date_range, token, start, end):

    search_url = "https://ops.epo.org/rest-services/published-data/search"

    # Headers with authentication
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/xml",
        "User-Agent": "ProjectAcademicBot/1.0 (for educational purposes only, contact: collen.ellis@sorbonne-universite.fr)",
        "Connection": "keep-alive"
    }
    
    params = {
    "q": f'pd={date_range}',
    "range": f"{start}-{end}",
    }
    
    response = requests.get(search_url, headers=headers, params=params)
    
    if response.status_code == 200:
        
        patents_list = []
        
        # Namespace dictionary (important for parsing)
        ns = {"epo": "http://www.epo.org/exchange"}
        
        xml_data = response.text
        root = ET.fromstring(xml_data)
        
        for patent in root.findall(".//epo:document-id", namespaces=ns):
            country = patent.find("epo:country", namespaces=ns)
            doc_number = patent.find("epo:doc-number", namespaces=ns)
        
            if country is not None and doc_number is not None:
                patents_list.append({
                "country": country.text,
                "doc_number": doc_number.text
            })
        
        return patents_list
    else:
        print(f"Error: {response.status_code}, {response.text}")
        return []

In [7]:
amount_per_year = pd.read_csv('uncollected_years_amount.csv')
# Drop all rows where year is between 1928 and 1965 (inclusive)
amount_per_year = amount_per_year[~amount_per_year["year"].between(1966, 2024)]
display(amount_per_year)

Unnamed: 0,year,sample_amount
0,1929,881
1,1930,894
2,1931,907
3,1932,920
4,1933,933
5,1934,947
6,1935,960
7,1936,973
8,1937,987
9,1938,1000


In [8]:
# Monthly date suffixes
months = {
    "01": ("0101", "0131"),
    "02": ("0201", "0228"),  # Not handling leap years
    "03": ("0301", "0331"),
    "04": ("0401", "0430"),
    "05": ("0501", "0531"),
    "06": ("0601", "0630"),
    "07": ("0701", "0731"),
    "08": ("0801", "0831"),
    "09": ("0901", "0930"),
    "10": ("1001", "1031"),
    "11": ("1101", "1130"),
    "12": ("1201", "1231"),
}

range_size = 100
max_total = 2000  # EPO API cap
all_patents = []

# Loop over each year/amount in your DataFrame
for _, row in amount_per_year.iterrows():
    year = int(row["year"])
    total_amount = int(row["sample_amount"])

    token = get_token()
    year_results = []

    monthly_quota = math.floor(total_amount / 12)
    remaining = total_amount

    for i, (month, (start_suffix, end_suffix)) in enumerate(months.items()):
        if remaining <= 0:
            break

        # Ensure the last month makes up for any rounding loss
        target_this_month = min(monthly_quota, remaining)
        if i == 11:
            target_this_month = remaining

        collected_this_month = []
        start = 1
        end = range_size
        date_range = f"{year}{start_suffix}-{year}{end_suffix}"

        while len(collected_this_month) < target_this_month:
            time.sleep(random.uniform(4, 4.5))
            results = patent_search(date_range, token, start, end)

            if results is None or len(results) == 0:
                print(f"Failed for {year}-{month}, start={start}")
                break

            collected_this_month.extend(results)
            if len(results) < range_size:
                break

            start += range_size
            end += range_size

        collected_this_month = collected_this_month[:target_this_month]  # trim extras
        year_results.extend(collected_this_month)
        remaining -= len(collected_this_month)

        print(f"{year}-{month}: Collected {len(collected_this_month)} patents")

    print(f"✔️ {year}: Total collected = {len(year_results)} patents")
    all_patents.append(year_results)

1929-01: Collected 73 patents
1929-02: Collected 73 patents
1929-03: Collected 73 patents
1929-04: Collected 73 patents
1929-05: Collected 73 patents
1929-06: Collected 73 patents
1929-07: Collected 73 patents
1929-08: Collected 73 patents
1929-09: Collected 73 patents
1929-10: Collected 73 patents
1929-11: Collected 73 patents
1929-12: Collected 78 patents
✔️ 1929: Total collected = 881 patents
1930-01: Collected 74 patents
1930-02: Collected 74 patents
1930-03: Collected 74 patents
1930-04: Collected 74 patents
1930-05: Collected 74 patents
1930-06: Collected 74 patents
1930-07: Collected 74 patents
1930-08: Collected 74 patents
1930-09: Collected 74 patents
1930-10: Collected 74 patents
1930-11: Collected 74 patents
1930-12: Collected 80 patents
✔️ 1930: Total collected = 894 patents
1931-01: Collected 75 patents
1931-02: Collected 75 patents
1931-03: Collected 75 patents
1931-04: Collected 75 patents
1931-05: Collected 75 patents
1931-06: Collected 75 patents
1931-07: Collected 75 

In [None]:
import csv
csv_filename = "patents_foster_1929-1965.csv"

with open(csv_filename, mode="w", newline="", encoding="utf-8") as csvfile:
    writer = csv.writer(csvfile)
    
    # Write header
    writer.writerow(["year", "country", "doc_number"])
    
    # Flatten the data while writing
    for i in range (len(all_patents)):
        year = 1929 + i
        patent_list = all_patents[i]

        for patent in patent_list:
            writer.writerow([year, patent["country"], patent["doc_number"]])

print(f"CSV file '{csv_filename}' has been created successfully.")

CSV file 'patents_foster_1929-1965.csv' has been created successfully.
