# Patent source for the InnoClass Pipeline in Dagster

The purpose of this notebook is to select the patent data that will be send to the pipeline on the main server.
The use of the EPAB library, especially the use of BigQuery directly, gives more flexibility to select proper distributions of data depending to their date, length,...

In [1]:
from epo.tipdata.epab import EPABClient
epab = EPABClient(env="PROD")
import re
import pandas as pd
import ipywidgets as widgets
from ipywidgets import (
    HTML, IntText, Text, Dropdown, SelectMultiple, Checkbox, Button, VBox, HBox, Layout
)
from IPython.display import display
import pickle
import json
import gzip
import subprocess
import os
from dotenv import load_dotenv

Patents data querying

First we test here different methods to load representative sets of patents for further analysis

In [2]:
# Helper functions to load and save lists of dictionaries to a gzipped jsonl FILE
#-----------------------------
def save_list(data, metadata,filename):
    """
    Saves a list of dictionaries to a gzipped JSON Lines (jsonl) file.

    Parameters:
        data (list): List of dictionaries to be saved.
        filename (str): The filename (including .gz if desired) where the data will be saved.
    """
    with gzip.open(filename, 'wt', encoding='utf-8') as f:
        # Dump the metadata dictionary to a JSON string and then add the newline
        f.write(json.dumps(metadata) + "\n")
        for item in data:
            # Write each dictionary as a JSON line.
            f.write(json.dumps(item) + "\n")

def load_list(filename):
    """
    Loads a gzipped JSON Lines (jsonl) file, separating the first line as metadata
    and the rest as a list of dictionaries.

    Parameters:
        filename (str): The filename of the gzipped jsonl file.

    Returns:
        tuple: A tuple containing:
            - dict: The metadata dictionary from the first line.
            - list: A list of dictionaries (data records) read from the file.
    """
    metadata = {}
    data_records = []
    
    with gzip.open(filename, 'rt', encoding='utf-8') as f:
        # Read the first line as metadata
        first_line = f.readline()
        if first_line:
            try:
                metadata = json.loads(first_line)
            except json.JSONDecodeError as e:
                print(f"Warning: Could not decode first line as metadata: {e}")
                pass     
        # Read the rest of the lines as data records
        for line in f:
            try:
                data_records.append(json.loads(line))
            except json.JSONDecodeError as e:
                print(f"Warning: Could not decode line as JSON, skipping: {line.strip()} - {e}")
                continue # Skip malformed lines

    return metadata, data_records


## REQUESTING PATENTS FROM EPO DATABASE

In [3]:
def create_sql_request_random(params):
    num_files=params["nbr"]
    nbrextract=params["nbrextract"]
    statement = f"""
          SELECT epab_doc_id, LEFT(description.text, {nbrextract}) as description, publication.date as pubdate, publication.number as pubnbr, title.en as title
        FROM `{epab.full_table_name}`
        WHERE description.language="EN"
        ORDER BY RAND()
        LIMIT {num_files};"""
    return statement

def create_sql_request_ipc(params):
    num_files=params["nbr"]
    nbrextract=params["nbrextract"]
    query=f"""WITH flattened AS (
      SELECT
        epab_doc_id, 
        ipc_struct.symbol AS ipc_class
      FROM `{epab.full_table_name}`,
           UNNEST(ipc) AS ipc_struct
    ),
    stratified_sample AS (
      SELECT
        ipc_class,
        epab_doc_id,
        LEFT(description.text, {nbrextract}) as description,
        publication.date as pubdate, publication.number as pubnbr, title.en as title
        ROW_NUMBER() OVER(PARTITION BY ipc_class ORDER BY RAND() LIMIT {num_files}) AS rn
      FROM flattened
        WHERE description.language="EN"
    )
    SELECT
      ipc_class,
      epab_doc_id,
      pubdate, pubnbr, description, title
    FROM stratified_sample
    WHERE rn = 1;"""
    return query

def create_sql_request_date(params):
    nbrextract=params["nbrextract"]
    num_files=params["nbr"]
    statement = f"""
   DECLARE num_files INT64 DEFAULT {num_files};

WITH DateRange AS (
  SELECT
    MIN(TIMESTAMP(PARSE_DATE('%Y%m%d', publication.date))) AS min_timestamp,
    CURRENT_TIMESTAMP() AS max_timestamp
  FROM
    `{epab.full_table_name}`
  WHERE publication.date IS NOT NULL -- Handle nulls
),
DateBuckets AS (
  SELECT
    TIMESTAMP(DATE_ADD(DATE(DateRange.min_timestamp), INTERVAL CAST(offset * (DATE_DIFF(DateRange.max_timestamp, DateRange.min_timestamp, DAY) / 10) AS INT64) DAY)) AS bucket_start,
    TIMESTAMP(DATE_ADD(DATE(DateRange.min_timestamp), INTERVAL CAST((offset + 1) * (DATE_DIFF(DateRange.max_timestamp, DateRange.min_timestamp, DAY) / 10) AS INT64) DAY)) AS bucket_end
  FROM
    DateRange,
    UNNEST(GENERATE_ARRAY(0, 9)) AS offset
),
GroupedData AS (
  SELECT
    epab_doc_id,
    LEFT(description.text, {nbrextract}) as description, publication.number as pubnbr,
    publication.date as pubdate, title.en as title
    TIMESTAMP(PARSE_DATE('%Y%m%d', publication.date)) as publication_timestamp,
    bucket_start,
    bucket_end
  FROM
    `{epab.full_table_name}`
  JOIN
    DateBuckets
  ON
    TIMESTAMP(PARSE_DATE('%Y%m%d', publication.date)) >= bucket_start AND TIMESTAMP(PARSE_DATE('%Y%m%d', publication.date)) < bucket_end
  WHERE description.language="EN" AND publication.date IS NOT NULL
),
SampledData AS (
  SELECT
    epab_doc_id,
    description,
    pubdate, pubnbr, title
    bucket_start
  FROM
    GroupedData
  QUALIFY ROW_NUMBER() OVER (PARTITION BY bucket_start ORDER BY RAND()) <= (num_files / 10)
)
SELECT
  epab_doc_id,
  description,
  pubdate, pubnbr, title
FROM
  SampledData;"""
    return statement

def create_sql_request_sdg(params):
    statement = f"""
          SELECT epab_doc_id,  description.text as description, publication.date as pubdate, publication.number as pubnbr,title.en as title
        FROM `{epab.full_table_name}`
        WHERE LOWER(description.text) LIKE '%sustainable development goal%'"""

    return statement

In [4]:
import re

def find_sdg_numbers(text):
    """
    Finds all unique Sustainable Development Goal (SDG) numbers in a given text,
    checking for both "sustainable development goal XX" and "SDG XX" patterns.

    Args:
        text (str): The input document text.

    Returns:
        set: A set of unique SDG numbers found in the text.
    """
    # Pattern to find "sustainable development goal XX" OR "SDG XX"
    # (sustainable development goal|SDG) - This is a non-capturing group for the alternatives.
    # \s* - Matches zero or more whitespace characters between "goal" or "SDG" and the number.
    # (\d+) - Captures the number (one or more digits).
    # re.IGNORECASE makes the search case-insensitive for both parts of the pattern.
    pattern = re.compile(r"(goal|SDG)\s*(\d+)", re.IGNORECASE)

    # Find all matches in the text
    matches = pattern.finditer(text)

    sdg_numbers = set() # Use a set to store unique numbers

    for match in matches:
        # match.group(2) because the first capturing group is (sustainable development goal|SDG)
        # and the second capturing group is (\d+)
        sdg_number = int(match.group(2))
        sdg_numbers.add(sdg_number) # Add to the set

    return sdg_numbers

import re

OFFICIAL_SDG_DESCRIPTIONS = {
    1: "End poverty in all its forms everywhere.",
    2: "End hunger, achieve food security and improved nutrition and promote sustainable agriculture.",
    3: "Ensure healthy lives and promote well-being for all at all ages.",
    4: "Ensure inclusive and equitable quality education and promote lifelong learning opportunities for all.",
    5: "Achieve gender equality and empower all women and girls.",
    6: "Ensure availability and sustainable management of water and sanitation for all.",
    7: "Ensure access to affordable, reliable, sustainable and modern energy for all.",
    8: "Promote sustained, inclusive and sustainable economic growth, full and productive employment and decent work for all.",
    9: "Build resilient infrastructure, promote inclusive and sustainable industrialization and foster innovation.",
    10: "Reduce inequality within and among countries.",
    11: "Make cities and human settlements inclusive, safe, resilient and sustainable.",
    12: "Ensure sustainable consumption and production patterns.",
    13: "Take urgent action to combat climate change and its impacts.",
    14: "Conserve and sustainably use the oceans, seas and marine resources for sustainable development.",
    15: "Protect, restore and promote sustainable use of terrestrial ecosystems, sustainably manage forests, combat desertification, and halt and reverse land degradation and halt biodiversity loss.",
    16: "Promote peaceful and inclusive societies for sustainable development, provide access to justice for all and build effective, accountable and inclusive institutions at all levels.",
    17: "Strengthen the means of implementation and revitalize the Global Partnership for Sustainable Development."
    }
# Define the 17 Sustainable Development Goal descriptions
# Using a dictionary for easy lookup by number
SDG_DESCRIPTIONS = {
    1: "No Poverty",
    2: "Zero Hunger",
    3: "Good Health and Well-being",
    4: "Quality Education",
    5: "Gender Equality",
    6: "Clean Water and Sanitation",
    7: "Affordable and Clean Energy",
    8: "Decent Work and Economic Growth",
    9: "Industry, Innovation and Infrastructure",
    10: "Reduced Inequalities",
    11: "Sustainable Cities and Communities",
    12: "Responsible Consumption and Production",
    13: "Climate Action",
    14: "Life Below Water",
    15: "Life on Land",
    16: "Peace, Justice and Strong Institutions",
    17: "Partnerships for the Goals"
}

def find_sdg_numbers_direct(text):
    """
    Finds all unique Sustainable Development Goal (SDG) numbers in a given text,
    checking for both "sustainable development goal XX" and "SDG XX" patterns.

    Args:
        text (str): The input document text.

    Returns:
        set: A set of unique SDG numbers directly mentioned in the text.
    """
    pattern = re.compile(r"(goal|SDG)\s*(\d+)", re.IGNORECASE)
    matches = pattern.finditer(text)
    sdg_numbers = set()
    for match in matches:
        sdg_number = int(match.group(2))
        sdg_numbers.add(sdg_number)
    return sdg_numbers

def find_sdg_numbers_from_description(full_description_text):
    """
    Finds SDG numbers by searching for the text of SDG descriptions within
    a given full description text.

    Args:
        full_description_text (str): The full description text of a document.

    Returns:
        list: A list of SDG numbers found based on their description text.
              (Can contain duplicates if a description is found multiple times).
    """
    found_sdgs_by_description = []
    # Create a case-insensitive regex pattern for each SDG description
    # Ensure word boundaries to avoid partial matches (e.g., "life" matching part of "wildlife")
    # Adding \b for word boundaries. For multi-word descriptions, the internal spaces are fine.
    # We escape any special regex characters in the description just in case.
    for sdg_num, sdg_desc in SDG_DESCRIPTIONS.items():
        # Escape potential regex special characters in the description
        escaped_desc = re.escape(sdg_desc)
        # Use a regex that allows for slight variations like punctuation around the words
        # \b ensures whole word match, but for longer phrases, we want the exact phrase.
        # So we'll use a slightly more flexible approach, matching the phrase itself.
        # Using re.finditer to find all occurrences
        desc_pattern = re.compile(rf"{escaped_desc}", re.IGNORECASE)
        
        for match in desc_pattern.finditer(full_description_text):
            found_sdgs_by_description.append(sdg_num)
            
    return found_sdgs_by_description

def process_documents_for_sdgs(documents_data):

    processed_results = []
    for doc in documents_data:
        doc_id = doc.get('pubnbr', 'N/A')
        doc["original_text"]=doc["description"]
        del doc["description"]
        text = doc.get('original_text', '')
        full_description = doc.get('original_text', '')

        # 1. Find SDG numbers directly from the main 'text'
        direct_sdgs = find_sdg_numbers_direct(text)


        description_sdgs = []
        # 2. If no direct SDGs found, look in the 'full_description'
        if not direct_sdgs:
            description_sdgs = find_sdg_numbers_from_description(full_description)
        
        # Combine all found SDGs and get unique ones
        all_unique_sdgs = set(direct_sdgs) # Start with direct SDGs
        if description_sdgs:
            all_unique_sdgs.update(description_sdgs) # Add description-based SDGs
        classes=sorted(list(all_unique_sdgs))

        result=[]
        for value in classes:
            result.append({"value": "SDG"+str(value), "score": 1})
        doc["sdg"]=result
        doc["reference"]=True
        if len(result)==0 or len(result)==17:
            doc["validation"]=False
        else:
            doc["validation"]=True
        doc["id"]=doc['epab_doc_id']
        processed_results.append(doc)

    return processed_results

           


Enter here the filename to save raw patent descriptions requested from the database. The purpose is to avoid permanently sending requests to the database

In [5]:
# main function to create and send the request to the database
def query_raw(params):
    data_selec=params["extract_method"]
    if data_selec=="Random":
        statement=create_sql_request_random(params)
        print(statement)
    if data_selec=="IPC":
        statement=create_sql_request_ipc(params)
    if data_selec=="Date":
        statement=create_sql_request_date(params)
    if data_selec=="SDG":
        statement=create_sql_request_sdg(params)
    results = epab.sql_query(statement) 
    #TODO add CPC and other interesting metrics parameters
    if data_selec!="SDG":
        docs = [{'id': item['epab_doc_id'], 'original_text': item['description'], 'pubdate':item['pubdate'],'pubnbr':item['pubnbr'],'title':item['title'],'cleaned_text':'','sdg':[{'value':'','score':0.}],'target':[{'value':'','score':0.}],'thumbsup':0,'thumbsdown':0,'reference':False,'validation':False} for item in results]
    else:
        docs=process_documents_for_sdgs(results)
        for item in docs:
            print(item["sdg"])
    save_list(docs,params,params["filename"])
    return 

In [6]:
def send_data_to_server(filename):
    # Load environment variables from .env file
    load_dotenv()
    # Define a variable based on a value from .env
    server = os.getenv("server")
    username=os.getenv("username")
    remote_host=username+"@"+server
    remote_path = "/home/codechallenge/InnoClass/backend/classEngine/data/"
    command = ["scp", filename, f"{remote_host}:{remote_path}"]
    try:
        print("command 1 : ")
        process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        stdout, stderr = process.communicate() # Append newline to password
        return_code = process.returncode
        if return_code == 0:
            print("SCP command executed successfully.")
            print("Output:", stdout)
        else:
            print(f"SCP command failed with error (code {return_code}):")
            print("Error!:", stderr)
   
    except FileNotFoundError:
        print("Error: scp command not found.")
    
    except Exception as e:
        print(f"An unexpected error occurred: {e}")



In [7]:
# Define a style with a wider description width
style = {'description_width': '150px'}  # Increase as needed

# Create form widgets with the updated style
file_name_widget = widgets.Text(
    value='',
    description='File Name:',
    placeholder='Enter file name',
    style=style
)

number_widget = widgets.IntText(
    value=10,
    description='Number of patents:',
    style=style
)

numberextract_widget = widgets.IntText(
    value=5000,
    description='Number of characters:',
    style=style
)

extraction_widget = widgets.Dropdown(
    options=["Random", "IPC", "Date","SDG"],
    description='Extraction method:',
    style=style
)

data_widget=widgets.Output()

start_button = widgets.Button(
    description='Start',
    button_style='success'
)

# Define the function to be executed when the button is clicked
def on_start_clicked(b):
    params = {
        "filename": file_name_widget.value,
        "nbr": number_widget.value,
        "nbrextract": numberextract_widget.value,
        "extract_method": extraction_widget.value
    }

    print("querying")
    typereq=query_raw(params)
    print("sending data")
    send_data_to_server(params["filename"])

start_button.on_click(on_start_clicked)

# Display the form

form_items = widgets.VBox([
    file_name_widget,
    number_widget,
    numberextract_widget,
    extraction_widget,
    start_button,
    data_widget
])

def update_filename(change):
    selected_method = change.new
    if selected_method == "Random":
        file_name_widget.value = "raw_data.tar.gz"
    elif selected_method == "IPC":
        file_name_widget.value = "raw_data.tar.gz"
    elif selected_method == "Date":
        file_name_widget.value = "raw_data.tar.gz"
    elif selected_method == "SDG":
        file_name_widget.value = "sdg_data.dat.gz"
    else:
        file_name_widget.value = "" # Default or empty if no match
extraction_widget.observe(update_filename, names='value')
display(form_items)

VBox(children=(Text(value='', description='File Name:', placeholder='Enter file name', style=TextStyle(descrip…