In [None]:
# Install dependencies
!pip install -q -U google-genai

In [None]:
# Set up libraries
import json
import os
import pymupdf
from PIL import Image
from pdf2image import convert_from_path
import base64
import pandas as pd
import numpy as np
import re
import shutil
from google.genai import types
import json

In [None]:
#preprocess of the pdfs
#combine them into one-pages and convert to image

def combine_two_page (input_path):
    input_pdf = pymupdf.open(input_path)
    if len(input_pdf) != 2: 
        print (f"Transfering {input_path}, not two-page")
        file_name = os.path.splitext(os.path.basename(input_path))[0]
        output_path = os.path.join(output_dir,f"{file_name}.pdf")
        image_path = os.path.join(output_dir,f"{file_name}.jpg")
        input_pdf.save(output_path)

        images = convert_from_path(output_path)
        images[0].save(image_path)
        return
    #create new pdf
    output_pdf = pymupdf.open()
    new_page = output_pdf.new_page(width = 2*input_pdf[0].rect.width, height = input_pdf[0].rect.height)
    left_rect = pymupdf.Rect(0, 0, input_pdf[0].rect.width, input_pdf[0].rect.height)
    right_rect = pymupdf.Rect(input_pdf[0].rect.width, 0, 2*input_pdf[0].rect.width, input_pdf[0].rect.height)
    new_page.show_pdf_page(left_rect, input_pdf, 0)
    new_page.show_pdf_page(right_rect, input_pdf, 1)

    file_name = os.path.splitext(os.path.basename(input_path))[0]
    output_path = os.path.join(output_dir,f"{file_name}.pdf")
    image_path = os.path.join(output_dir,f"{file_name}.jpg")
    output_pdf.save(output_path)

    images = convert_from_path(output_path)
    images[0].save(image_path)
    

    print(f"{file_name} combined and saved")

def  process_files_in_folder(folder_path, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for file in os.listdir(folder_path):
        if file.endswith(".pdf"):
            input_path = os.path.join(folder_path, file)
            combine_two_page(input_path)

In [None]:
folder_path = # Folder of the patent cards
output_dir = 'cards_in_image'# Temporary directory

process_files_in_folder(folder_path, output_dir)

In [None]:
# Define the function to encode base64 image
def encode_image(image_path):
    with open(image_path, 'rb') as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

In [None]:
# Encode the jpg images into base64 strings for model input
for file in os.listdir(output_dir):
    if file.endswith(".jpg"):
            image = os.path.join(output_dir, file)
            base64_image = encode_image(image)
            file_name = os.path.splitext(os.path.basename(image))[0]
            text_path = os.path.join(output_dir,f"{file_name}.txt")
            with open(text_path, "w") as text:
                 text.write(base64_image)

In [None]:
# Define the function for structural output

function = types.FunctionDeclaration(
    name = 'extract_patent_data',
    description = 'Extract the patent data structurally. DO NOT HALLUCINATE. LEAVE EMPTY IF NOT AVAILABLE OR UNKNOWN',
    parameters = types.Schema(
        type ='OBJECT',
        properties = {
            'patentnummer': types.Schema(
                type='STRING',
                description='Patent number, output format:a six-digit number'
            ),
            'klass': types.Schema(
                type='STRING',
                description='Patent klass (original in DPK). marked "klass".Output FORMAT example: "37 b:1/10" or "39 a6:21/00". DO NOT USE EXPONENT. Not available for patent applied before 1968'
            ),
            'IPC': types.Schema(
                type='STRING',
                description='Patent klass in IPC, marked "Int. Cl.". Output format example: "C 07 d 9/00". Not available for patent applied before 1968'
            ),
            'patenthavare_antal': types.Schema(
                type='STRING',
                description='The amount of patent holders. DO NOT INCLUDE CROSS-OUT ONES'
            ),
            'patenthavare1': types.Schema(
                type='STRING',
                description='first patent holder name. KEEP ONLY THE NAME, NO LOCATION.'
            ),
            'patenthavare1_stad': types.Schema(
                type='STRING',
                description='City of the patent holder, ususally separated by a comma from the name. KEEP ONLY THE CITY NAME'
            ),
            'patenthavare1_land': types.Schema(
                type='STRING',
                description='Country of the patent holder. KEEP THE HISTORICAL NAME. DO NOT CONVERT INTO MODERN NAMES'
            ),
            'patenthavare_typ': types.Schema(
                type='STRING',
                description='Type of the patent holder',
                enum=['individual', 'institution', 'company', 'mixed']
            ),
            'patenthavare_typ_av_skrivet': types.Schema(
                type='STRING',
                description='Type of the record of patent holder. Infer from your vision understanding and analyze carefully',
                enum=['typed', 'mixed', 'handwritten']
            ),
            'patenthavare2': types.Schema(
                type='STRING',
                description='If the patentee is more than 1, Name of second patent holder. KEEP ONLY THE NAME, NO LOCATION.'
            ),
            'patenthavare2_stad': types.Schema(
                type='STRING',
                description='City of the second patent holder, ususally separated by a comma from the name. KEEP ONLY THE CITY NAME'
            ),
            'patenthavare2_land': types.Schema(
                type='STRING',
                description='Country of the second patent holder. KEEP THE HISTORICAL NAME. DO NOT CONVERT INTO MODERN NAMES'
            ),
            'ombud': types.Schema(
                type='STRING',
                description='Patent agent. USE ONLY THE NOT CROSSED-OUT ONES. KEEP ONLY THE NAME, including "m fl" if available. NO LOCATION'
            ),
            'ombud_adress': types.Schema(
                type='STRING',
                description='Patent agent address, if available'
            ),
            'patenttid_fr': types.Schema(
                type='STRING',
                description='Application date. OUTPUT FORMAT:yyyy-mm-dd'
            ),
            'patenttid_till': types.Schema(
                type='STRING',
                description='Expected expiration date. OUTPUT FORMAT:yyyy-mm-dd'
            ),
            'beviljandedatum': types.Schema(
                type='STRING',
                description='Grant date, marked “Patent meddelat”. OUTPUT FORMAT:yyyy-mm-dd'
            ),
            'utgångsdatum': types.Schema(
                type='STRING',
                description='Actual expiration date, when the patent marked "Kung.förf. or Avförd". OUTPUT FORMAT:yyyy-mm-dd'
            ),
            'utgångsskäl': types.Schema(
                type='STRING',
                enum=['Lack of payment of fees', 'Expiration of patent time', 'Compulsory working clause', 'Litigation'],
                description='Plausible expiration reason, when the patent marked Kung.förf. or Avförd. Inference needed here.'
            ),
            'ansökingsnr': types.Schema(
                type='STRING',
                description='Application number. DO NOT OMIT HANDWRITTEN PART'
            ),
            'sistaerlagdapatentåravgifter_datum': types.Schema(
                type='STRING',
                description='Date of last patent fee record before expiration. YEAR MAYBE TYPED IN ANOTHER DIRECTION. OUTPUT FORMAT:yyyy-mm-dd'
            ),
            'sistaerlagdapatentåravgifter_belopp': types.Schema(
                type='STRING',
                description='Amount of last patent fee record before expiration (krona). OUTPUT FORMAT:number with two decimals'
            ),
            'sista_aviserat_datum': types.Schema(
                type='STRING',
                description='Date of the last "Aviserat" mark on the patent card, some patents may have no this stamp. OUTPUT FORMAT: yyyy-mm-dd'
            ),
            'uppfinningensbenämning': types.Schema(
                type='STRING',
                description='Title of the invention'
            ),
            'uppfinningensbenämning_typ': types.Schema(
                type='STRING',
                description='Type of the invention title. Infer this from your database and visual understanding. Compare very carefully',
                enum=['typed', 'mixed', 'handwritten']
            ),
            'uppfinnare_antal': types.Schema(
                type='STRING',
                description='The amount of inventors'
            ),
            'uppfinnare1': types.Schema(
                type='STRING',
                description='First inventor, fill in the value of patenthavare name if the patent holder is the inventor. NO LOCATION'
            ),
            'uppfinnare2': types.Schema(
                type='STRING',
                description='Second inventor. NO LOCATION'
            ),
            'uppfinnare3': types.Schema(
                type='STRING',
                description='Third inventor. NO LOCATION'
            ),
            'uppfinnare4': types.Schema(
                type='STRING',
                description='Fourth inventor. NO LOCATION'
            ),
            'prioritet': types.Schema(
                type='BOOLEAN',
                description='Priority patent or not'
            ),
            'prioritetsdatum': types.Schema(
                type='STRING',
                description='Priority patent date. USE THE FIRST ONE, IF THERE ARE MORE THAN ONE. OUTPUT FORMAT: yyyy-mm-dd'
            ),
            'prioritetsland': types.Schema(
                type='STRING',
                description='Priority patent country. KEEP THE HISTORICAL NAME. DO NOT CONVERT INTO MODERN NAMES'
            ),
            'patentöverföring': types.Schema(
                type='BOOLEAN',
                description='Patent transfer or not'
            ),
            'överföringsdatum': types.Schema(
                type='STRING',
                description='Transfer date, output format:yyyy-mm-dd'
            ),
            'tidigare_patenthavare': types.Schema(
                type='STRING',
                description='Previous patent holder, usually crossed out on the card. ONLY THE NAME. NO LOCATION'
            ),
            'tidigare_patenthavare_stad': types.Schema(
                type='STRING',
                description='City of the previous patent holder, usually crossed out on the card.KEEP ONLY THE CITY NAME'
            ),
            'tidigare_patenthavare_land': types.Schema(
                type='STRING',
                description='Country of the previous patent holder'
            ),
            'licensupplåtelse': types.Schema(
                type='BOOLEAN',
                description='Patent licensing or not'
            ),
            'tilläggspatent': types.Schema(
                type='BOOLEAN',
                description='Supplimentary patent or not, only true if the tilläggspatentnummer can be found'
            ),
            'tilläggspatentnummer': types.Schema(
                type='STRING',
                description='Supplimentary patent number.'
            ),
            'ombudsbyte': types.Schema(
                type='BOOLEAN',
                description='Patent agent change or not'
            ),
            'tidigare_ombud': types.Schema(
                type='STRING',
                description='Former patent agent, usually crossed out on the card. ONLY THE NAME. NO LOCATION'
            ),
            'ombudsbytesdatum': types.Schema(
                type='STRING',
                description='Patent agent change date'
            ),
        },
        required=['patentnummer', 'klass', 'IPC',
                  'patenthavare_antal', 'patenthavare1','patenthavare1_stad', 'patenthavare1_land','patenthavare_typ','patenthavare_typ_av_skrivet', 
                  'ombud', 'ombud_adress',
                  'patenttid_fr', 'patenttid_till', 'beviljandedatum',
                  'utgångsdatum', 'utgångsskäl',
                  'ansökingsnr',
                  'sistaerlagdapatentåravgifter_datum','sistaerlagdapatentåravgifter_belopp', 'sista_aviserat_datum',
                  'uppfinningensbenämning','uppfinningensbenämning_typ', 
                  'uppfinnare_antal', 'uppfinnare1',
                  'prioritet',
                  'patentöverföring',
                  'licensupplåtelse',
                  'tilläggspatent',
                  'ombudsbyte']
    )
)

In [None]:
# Load the api

from google import genai
api_key =  #YOUR_API_KEY
client = genai.Client(api_key = api_key)

In [None]:
# Define the function to inference with API

response_string_list =[]
def inference_with_api(image_path):
    try:
        with open(image_path,'r') as image:
            base64_image = image.read()
        contents = types.Content(parts=[
            types.Part.from_text(text = "Extract all data in the patent card. Output in required format.LEAVE EMPTY IF NOT AVAILABLE OR UNKNOWN"),
            types.Part.from_bytes(data = base64_image, mime_type='image/jpeg')
        ])     
        tool = types.Tool(function_declarations=[function])
        
        generate_content_config = types.GenerateContentConfig(
            temperature = 0,
            tools = [tool]
        )
        response = client.models.generate_content(
            model = "gemini-2.5-pro-preview-03-25", 
            contents = contents,
            config = generate_content_config
        )
        
        if (response is None or 
            not response.candidates or 
            response.candidates[0].content is None or 
            not response.candidates[0].content.parts or 
            response.candidates[0].content.parts[0].function_call is None or 
            response.candidates[0].content.parts[0].function_call.args is None):
            print(f"⚠️ Invalid response format from API for {image_path}")
            return None
  
        response_string = response.candidates[0].content.parts[0].function_call.args
        # the result is already dictionary

        # Add image_path into the dictionary
        match = re.search(r"(\d{6})", image_path)
        id_num = match.group(1)
        response_string["ID"] = id_num

        response_string_list.append(response_string)

    except FileNotFoundError:
        print(f"Error: Image file not found at {image_path}")
        return None
    except Exception as e:
        print(f"Error during API call: {e}")
        return None

In [None]:
# Create image path of base64 images
# Inference with the path

image_dir = output_dir

for file in os.listdir(image_dir):
    if file.endswith(".txt"):
        image_path = os.path.join(image_dir, file)
        print(image_path)
        inference_with_api(image_path)

shutil.rmtree(output_dir)

jsonl_name = 'response_string_list.jsonl'
with open(jsonl_name, 'w', encoding='utf-8') as jsonl_file:
    for response_string in response_string_list:
        jsonl_file.write(json.dumps(response_string,ensure_ascii=False) + '\n')

In [None]:
# Pre-check using dataframe

parsed_data = [item for item in response_string_list]
rec_result = pd.json_normalize(parsed_data)

rec_result['patentnummer'] = rec_result['ID'] #replace the patent number with document name
rec_result.drop(columns=['ID'], inplace=True)  # Delete 'ID' column
rec_result.head()

In [None]:
# Basic post processing

def output_clean(df):
    df['patenthavare1_stad'] = df.apply(
        lambda row: row['patenthavare1'].split(',')[1].strip() if row['patenthavare1_stad'] == '' and ',' in row['patenthavare1'] else row['patenthavare1_stad'],
        axis=1
    ) #try to fill in the city manually if it is not recognized
    df['patenthavare1'] = df['patenthavare1'].str.split(',').str[0] #separate the location information
    
    #to flag the potential wrong expiration date
    df['utgångsår'] = pd.to_numeric(df['utgångsår'], errors='coerce').astype('Int64')
    df['sista_aviserat_år'] = pd.to_numeric(df['sista_aviserat_år'], errors='coerce').astype('Int64')
   
    df['potential_wrong_expiration_date'] = df.apply(
        lambda row: 1 if pd.notnull(row['utgångsår']) and  pd.notnull(row['sista_aviserat_år'])
         and int(row['utgångsår']) - int(row['sista_aviserat_år']) >1 and row['utgångsskäl'] == 'Lack of payment of fees' else 0,
        axis = 1
    )
    return df

In [None]:
# Save the output as csv

rec_result.to_csv("gemini2.5pro_extraction.csv", index=False)