# Patent data extraction analysis

The purpose of this notebook is to test and investigate the different methods to extract the SDG-relevant data from the patents 

In [1]:
!pip install autocorrect



In [2]:
from epo.tipdata.epab import EPABClient
epab = EPABClient(env="TEST")
import re
import pandas as pd
import ipywidgets as widgets
from ipywidgets import (
    HTML, IntText, Text, Dropdown, SelectMultiple, Checkbox, Button, VBox, HBox, Layout
)
from IPython.display import display
import pickle
import json
from autocorrect import Speller #TODO check if we do the spelling stuff here
import gzip

Patents data loading

First we test here different methods to load representative sets of patents for further analysis

In [3]:
# Create the log display area
log_output = widgets.Textarea(
    value='',
    placeholder='Logs will appear here.',
    description='Logs:',
    disabled=False,
    layout=widgets.Layout(width='100%', height='200px')  # Adjust size as needed
)

# Create a VBox to hold the log display
log_widget = widgets.VBox([log_output])

# Display the log widget
display(log_widget)

def log(text):
    """
    Adds text to the log display widget.
    """
    log_output.value += text + '\n'  # Append the new text with a newline character

VBox(children=(Textarea(value='', description='Logs:', layout=Layout(height='200px', width='100%'), placeholde…

In [77]:
def save_list(data, filename):
    """
    Saves a list of dictionaries to a gzipped JSON Lines (jsonl) file.

    Parameters:
        data (list): List of dictionaries to be saved.
        filename (str): The filename (including .gz if desired) where the data will be saved.
    """
    with gzip.open(filename, 'wt', encoding='utf-8') as f:
        for item in data:
            # Write each dictionary as a JSON line.
            f.write(json.dumps(item) + "\n")

def load_list(filename):
    """
    Loads a gzipped JSON Lines (jsonl) file and returns a list of dictionaries.

    Parameters:
        filename (str): The filename of the gzipped jsonl file.

    Returns:
        list: A list of dictionaries read from the file.
    """
    result = []
    with gzip.open(filename, 'rt', encoding='utf-8') as f:
        for line in f:
            result.append(json.loads(line))
    return result


In [44]:
# Define the save_data function
def save_data(params):
    # Create a configuration file with the filename from params
    conf_filename = f"{params['filename']}.conf"
    with open(conf_filename, "w") as f:
        json.dump(params, f, indent=4)
    log(f"File saved to {conf_filename}")

In [16]:
def create_sql_request(params):
    print(params)
    num_files=params["nbr"]
    data_selec=params["extract_method"]
    statement = f"""
        SELECT epab_doc_id, description
        FROM `{epab.full_table_name}`
        WHERE description.language="EN"
        ORDER BY RAND()
        LIMIT {num_files};"""
    return statement


In [43]:
# Define a style with a wider description width
style = {'description_width': '150px'}  # Increase as needed

# Create form widgets with the updated style
file_name_widget = widgets.Text(
    value='',
    description='File Name:',
    placeholder='Enter file name',
    style=style
)

number_widget = widgets.IntText(
    value=10,
    description='Number of patents:',
    style=style
)

extraction_widget = widgets.Dropdown(
    options=["Random", "CPC", "Length", "Asian"],
    description='Extraction method:',
    style=style
)

start_button = widgets.Button(
    description='Start',
    button_style='success'
)

# Define the function to be executed when the button is clicked
def on_start_clicked(b):
    params = {
        "filename": file_name_widget.value,
        "nbr": number_widget.value,
        "extract_method": extraction_widget.value
    }
    save_data(params)
    load_raw(params)
    log("finished loading patents")

start_button.on_click(on_start_clicked)

# Display the form
form_items = widgets.VBox([
    file_name_widget,
    number_widget,
    extraction_widget,
    start_button
])

display(form_items)

VBox(children=(Text(value='', description='File Name:', placeholder='Enter file name', style=TextStyle(descrip…

In [34]:
# Keywords to search for in headings (allowing fuzzy matching with up to one error)
keywords1 = ["background",  "prior art", "state of the art", "field of the invention", "technical field","summary"]
keywords2=["herein described subject matter", "technology described herein", "subject of the invention", "belongs to the field", "invention is","invention relates to", "present invention refers to"]

In [11]:
def load_raw(params):
    statement=create_sql_request(params)
    results = epab.sql_query(statement) 
    #TODO add CPC and other interesting metrics parameters
    docs = [{'epab_doc_id': item['epab_doc_id'], 'original_text': item['description']['text']} for item in results]
    save_list(docs,params["filename"])

In [55]:
def process_texts(params):

    docs=load_list(params["raw_source_file"])
    extracted_list=[]

    for doc in docs:
        epab_doc_id = doc['epab_doc_id']
        original_text = doc['original_text']
        text = original_text
    
        heading_pattern = r'<heading id="h\d+">.*?<\/heading>'
        heading_matches = list(re.finditer(heading_pattern, text))
    
        extracted_data = {}
        extracted_data[epab_doc_id] = []
    
        for i, match in enumerate(heading_matches):
            heading_text = match.group(0)
            cleaned_heading = re.sub(r'<[^>]+>', '', heading_text).strip()
            if any(keyword.lower() in cleaned_heading.lower() for keyword in keywords1):
                start_pos = match.end()
                end_pos = len(text)
                if i + 1 < len(heading_matches):
                    end_pos = heading_matches[i + 1].start()
                section_text = text[start_pos:end_pos].strip()
                extracted_data[epab_doc_id].append(section_text)
    
        paragraph_pattern = r'<p id=".*?">(.*?)</p>'
        paragraph_matches = list(re.finditer(paragraph_pattern, text, re.DOTALL))
        for match in paragraph_matches:
            paragraph_text = match.group(1).strip()
            if any(keyword.lower() in paragraph_text.lower() for keyword in keywords2):
                extracted_data[epab_doc_id].append(paragraph_text)
    
        merged_text_list = [section for section in extracted_data[epab_doc_id] if section]  # Remove empty sections
        merged_text = "\n".join(merged_text_list)
    
        merged_result = {
                'epab_doc_id': epab_doc_id,
                'merged_text': merged_text,
            }
    
        # Create the first DataFrame
        extracted_list.append({'id': merged_result['epab_doc_id'],
                                    'extracted_text': merged_result['merged_text'],
                                    'original_text': original_text})
    log2("text extraction finished")
    return extracted_list

In [56]:
def analyze_text_lengths(data):
    """
    Analyzes a list of dictionaries, calculates text lengths, and generates metrics.

    Args:
        data: A list of dictionaries, where each dictionary contains
              "extracted_text" and "text" keys.

    Returns:
        A tuple containing:
        - A list of dictionaries, where each dictionary has been updated
          with the length of "extracted_text".
        - A dictionary containing the calculated metrics.
    """
    updated_data = []
    metrics = {"less_than_50": 0, "greater_than_800": 0}

    for item in data:
        # Calculate the length of the extracted text.
        extracted_text_length = len(item["extracted_text"])
        # Create a new dictionary instead of updating the original
        updated_item = item.copy()  # Important: Create a copy to avoid modifying original data
        updated_item["extracted_text_length"] = extracted_text_length
        updated_data.append(updated_item) # append the updated item

        # Update metrics based on the length.
        if extracted_text_length < 50:
            metrics["less_than_50"] += 1
        elif extracted_text_length > 800:
            metrics["greater_than_800"] += 1

    return updated_data, metrics

In [61]:
import os
import json
import ipywidgets as widgets
from IPython.display import display

# Create the left side: Update button and file list
update_button = widgets.Button(description="Update")
file_list = widgets.Select(options=[], rows=10, description="Files:")

# Create the right side: Inputs for filename, raw source, comment, and cut-off length.
filename_input = widgets.Text(description="Filename:")
raw_source_input = widgets.Text(description="Raw source file:")
comment_input = widgets.Textarea(
    description="Comment:",
    layout=widgets.Layout(width='100%', height='80px')
)
cutoff_length_input = widgets.IntText(value=1000, description="Cut-off length:")
select_button = widgets.Button(description="Select")

def update_file_list(button):
    """
    Updates the file list with files ending in '.ext' in the current directory.
    """
    files = [f for f in os.listdir('.') if f.endswith('.ext')]
    file_list.options = files

def load_file_content(change):
    """
    Loads the content of the selected file and populates the input widgets.

    Args:
        change (dict): The change dictionary from the file_list.observe event.
    """
    selected_file = change.new
    if selected_file:
        try:
            with open(selected_file, 'r') as f:
                file_content = f.read()
                # Try to parse as JSON first, if it fails, treat as raw text
                try:
                    data = json.loads(file_content)
                    filename_input.value = data.get("filename", "")
                    raw_source_input.value = data.get("raw_source_file", "")
                    comment_input.value = data.get("comment", "")
                    cutoff_length_input.value = data.get("cutoff_length", 1000) # Default value if not in file
                except json.JSONDecodeError:
                    # If it's not JSON, put the whole content into raw_source_input
                    raw_source_input.value = file_content
                    filename_input.value = os.path.splitext(selected_file)[0]  # Set filename without extension
                    comment_input.value = ""
                    cutoff_length_input.value = 1000

        except Exception as e:
            print(f"Error loading file: {e}")
            filename_input.value = ""
            raw_source_input.value = ""
            comment_input.value = ""
            cutoff_length_input.value = 1000

def on_select(button):
    """
    Gathers parameters, determines the output filename, saves parameters as JSON,
    and calls the process_texts function (which is assumed to be defined elsewhere).
    """
    params = {
        "filename": filename_input.value,
        "raw_source_file": raw_source_input.value,
        "comment": comment_input.value,
        "cutoff_length": cutoff_length_input.value,
        "selected_file": file_list.value
    }

    save_filename = filename_input.value
    if not save_filename.endswith(".ext"):
        save_filename = save_filename + ".ext"

    try:
        with open(save_filename, "w") as f:
            json.dump(params, f, indent=4)
    except Exception as e:
        print(f"Error saving file: {e}")
    extracted=process_texts(params)
    extracted,metrics=analyze_text_lengths(extracted)
    save_list(extracted,params["filename"]+".dat.gz")
    metrics["filename"]=params["filename"]+".met"
    save_data(metrics)
    log2(str(metrics))
    log2("finished")

# Set up event handlers
update_button.on_click(update_file_list)
file_list.observe(load_file_content, names='value')  # Observe changes to file_list.value
select_button.on_click(on_select)

# Organize the layout with two columns.
left_box = widgets.VBox([update_button, file_list])
right_box = widgets.VBox([
    filename_input,
    raw_source_input,
    comment_input,
    cutoff_length_input,
    select_button
])
ui1 = widgets.HBox([left_box, right_box])
log_output2 = widgets.Textarea(
    value='',
    placeholder='Results will appear here.',
    description='Logs:',
    disabled=False,
    layout=widgets.Layout(width='80%', height='100px')  # Adjust size as needed
)
log_output2.value=""
ui=widgets.VBox([ui1,log_output2])

# Display the UI in the notebook.
display(ui)

def log2(text):
    """
    Adds text to the log display widget.
    """
    log_output2.value += text + '\n'  # Append the new text with a newline character
    
# Initial update of the file list
update_file_list(None)



VBox(children=(HBox(children=(VBox(children=(Button(description='Update', style=ButtonStyle()), Select(descrip…

In [66]:
def filter_fails(data):
    return [item for item in data if len(item.get('extracted_text', '')) < 50 or len(item.get('extracted_text', '')) > 800]

In [64]:
from bs4 import BeautifulSoup
def html_to_text(html):
    soup = BeautifulSoup(html, "html.parser")
    # The separator="\n" inserts a newline between blocks of text
    return soup.get_text(separator="\n")

In [80]:
import os
import ipywidgets as widgets
from IPython.display import display

def html_to_text(html):
    # Placeholder: strip HTML tags
    return html.replace('<p>', '').replace('</p>', '')

def filter_fails(data):
    return [item for item in data if item.get('difficult')]

# Get list of files ending with ".dat.gz" from current directory
files = [f for f in os.listdir('.') if f.endswith('dat.gz')]

# Widget for listing files
file_selector = widgets.Select(
    options=files,
    description='Files:',
    rows=10,
    layout=widgets.Layout(width='200px')
)

# Text area to display the contents of the corresponding ".met.conf" file
met_conf_text = widgets.Textarea(
    description='Config:',
    layout=widgets.Layout(width='500px', height='200px')
)

# Callback to load and display the .met.conf file content when a file is selected
def on_file_select(change):
    selected_file = change['new']
    if selected_file:
        met_conf_filename = selected_file.replace('.dat.gz', '.met.conf')
        if os.path.exists(met_conf_filename):
            with open(met_conf_filename, 'r') as f:
                content = f.read()
                print(content)
        else:
            content = f"File not found: {met_conf_filename}"
        met_conf_text.value = str(content)

file_selector.observe(on_file_select, names='value')

# Button to load the list (using load_list function)
load_button = widgets.Button(description='Load')
list_data = []  # Will store the list of dictionaries loaded by load_list
filtered_data = []
current_index = 0  # To keep track of the current row

# Checkbox to toggle difficult-only filtering
difficult_only_checkbox = widgets.Checkbox(
    value=False,
    description='Display difficult files only'
)

# Text areas for displaying the extracted_text and original_text fields
extracted_text_area = widgets.Textarea(
    description='Extracted:',
    layout=widgets.Layout(width='500px', height='200px')
)
original_text_area = widgets.Textarea(
    description='Original:',
    layout=widgets.Layout(width='500px', height='200px')
)

def get_display_data():
    return filter_fails(list_data) if difficult_only_checkbox.value else list_data

# Update the text areas based on the current index
def update_text_areas():
    display_data = get_display_data()
    if display_data and 0 <= current_index < len(display_data):
        extracted_text_area.value = html_to_text(display_data[current_index].get('extracted_text', ''))
        original_text_area.value = html_to_text(display_data[current_index].get('original_text', ''))
    else:
        extracted_text_area.value = ''
        original_text_area.value = ''

def on_load_button_click(b):
    global list_data, current_index
    if file_selector.value:
        list_data = load_list(file_selector.value)
        current_index = 0
        update_text_areas()
    else:
        extracted_text_area.value = 'No file selected'
        original_text_area.value = ''

load_button.on_click(on_load_button_click)
difficult_only_checkbox.observe(lambda change: update_text_areas(), names='value')

# Navigation buttons for moving through rows
left_arrow = widgets.Button(description='<')
right_arrow = widgets.Button(description='>')

def on_left_arrow_click(b):
    global current_index
    if current_index > 0:
        current_index -= 1
        update_text_areas()

def on_right_arrow_click(b):
    global current_index
    if current_index < len(get_display_data()) - 1:
        current_index += 1
        update_text_areas()

left_arrow.on_click(on_left_arrow_click)
right_arrow.on_click(on_right_arrow_click)

# Layout the widgets
file_list_box = widgets.VBox([file_selector, load_button])
file_and_config = widgets.HBox([file_list_box, met_conf_text])
text_areas_box = widgets.HBox([extracted_text_area, original_text_area])
nav_buttons = widgets.HBox([left_arrow, right_arrow])

# Combine all into one UI layout
ui = widgets.VBox([file_and_config, difficult_only_checkbox, text_areas_box, nav_buttons])
display(ui)


VBox(children=(HBox(children=(VBox(children=(Select(description='Files:', layout=Layout(width='200px'), option…