In [1]:
!pip install /kaggle/input/whoosh-wheel-2-7-4/Whoosh-2.7.4-py2.py3-none-any.whl

Processing /kaggle/input/whoosh-wheel-2-7-4/Whoosh-2.7.4-py2.py3-none-any.whl
Installing collected packages: Whoosh
Successfully installed Whoosh-2.7.4


# Create Test Index

In [2]:
# Import necessary libraries
from pathlib import Path
import polars as pl
from tqdm import tqdm
import whoosh_utils

Processing /kaggle/input/whoosh-wheel-2-7-4/Whoosh-2.7.4-py2.py3-none-any.whl
Whoosh is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [3]:
comp_data_dir = Path("/kaggle/input/uspto-explainable-ai")

# Extract year and month from publication date, 
# filter for patents since 1975, rename a column, and collect the results.

meta = pl.scan_parquet(comp_data_dir / "patent_metadata.parquet")

meta = (
        meta.with_columns(
            pl.col("publication_date").dt.year().alias("year"),
            pl.col("publication_date").dt.month().alias("month"),
        )
    .filter(pl.col("publication_date") >= pl.date(1975,1,1))
    .rename({"cpc_codes":"cpc"})
    .collect()
)
    

In [4]:
#test_nn.melt() reshapes the LazyFrame from wide format to long format, converting all columns into rows under two columns: "variable" and "value".
test_nn = pl.scan_csv(comp_data_dir / "test.csv")
all_pub = test_nn.melt().collect().get_column("value").unique()
meta = meta.filter(pl.col("publication_number").is_in(all_pub))

In [5]:
meta.head()

publication_number,publication_date,filing_date,family_id,cpc,year,month
str,datetime[μs],datetime[μs],f64,list[str],i32,i8
"""US-10015385-B2…",2018-07-03 00:00:00,2017-05-18 00:00:00,52469910.0,"[""H04N21/4223"", ""H04N21/4227"", … ""H04N7/183""]",2018,7
"""US-10075387-B1…",2018-09-11 00:00:00,2015-06-03 00:00:00,63406538.0,"[""H04L47/70"", ""H04L67/02"", … ""H04W4/80""]",2018,9
"""US-10083363-B2…",2018-09-25 00:00:00,2015-05-26 00:00:00,57397574.0,"[""G06K9/00892"", ""G06T15/20"", … ""G06V40/70""]",2018,9
"""US-10182126-B2…",2019-01-15 00:00:00,2016-05-02 00:00:00,60158656.0,"[""G06F9/00"", ""H04L67/04"", … ""H04L67/59""]",2019,1
"""US-10197576-B2…",2019-02-05 00:00:00,2011-11-08 00:00:00,45094438.0,"[""G01N2400/00"", ""G01N33/6848""]",2019,2


In [6]:
patents = []
#calculates the number of unique combinations of year and month in the meta DataFrame.
n_unique = meta.select(["year", "month"]).n_unique()
n_unique

161

In [7]:
for (year, month), _ in tqdm(meta.group_by(["year", "month"]), total=n_unique):    
    patent_path = comp_data_dir / f"patent_data/{year}_{month}.parquet"
    # reads the Parquet file and excludes the claims and description columns.
    patent = pl.scan_parquet(patent_path).select(pl.exclude(["claims", "description"]))
    patents.append(patent)

100%|██████████| 161/161 [00:01<00:00, 116.36it/s]


In [8]:
#concatenates all the LazyFrames in the patents list into a single LazyFrame.
patent: pl.LazyFrame = pl.concat(patents)

# adds two new columns (claims and description) with empty string values to the patent LazyFrame.
patent = patent.with_columns(
    pl.lit("").alias("claims"),
    pl.lit("").alias("description"),
)

#meta.lazy() converts the meta DataFrame to a LazyFrame for lazy evaluation.
#join() performs a left join on meta and patent LazyFrames using the publication_number column.
#collect() executes the lazy operations and collects the result into a DataFrame, enabling streaming for efficient memory usage.
meta_with_text = (
    meta.lazy().join(patent, on="publication_number", how="left").collect(streaming=True)
)

#Writing the Result to a Parquet File
meta_with_text.write_parquet("meta_with_text.parquet")


In [9]:
# converts the meta_with_text DataFrame to a list of dictionaries.
documents = meta_with_text.to_dicts()

#creates a directory named test_index, including any necessary parent directories.
Path("test_index").mkdir(parents=True, exist_ok=True)

#creates an index in the test_index directory using the documents list. 
whoosh_utils.create_index("test_index", documents)

# Annealing

Optimize the query using the annealing method.

For simplicity, we will create a query using only “OR”.
This means that we will create a query “word1 OR word2 OR word3 OR ...”.

We will use an annealing method to determine which words to use.
The specific steps are as follows

1. select topk words with high TFIDF values as candidates
2. select the word with the maximum AP@50 using the annealing method
3. Combine the selected words with “OR” to form a query

In [10]:
# https://github.com/perrygeo/simanneal/blob/master/simanneal/anneal.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import abc
import copy
import datetime
import math
import pickle
import random
import signal
import sys
import time
from typing import Any, List, Tuple



def round_figures(x, n):
    """Returns x rounded to n significant figures."""
    return round(x, int(n - math.ceil(math.log10(abs(x)))))


def time_string(seconds):
    """Returns time in seconds as a string formatted HHHH:MM:SS."""
    s = int(round(seconds))  # round to nearest second
    h, s = divmod(s, 3600)  # get hours and remainder
    m, s = divmod(s, 60)  # split remainder into minutes and seconds
    return "%4i:%02i:%02i" % (h, m, s)

# Simulated Annealing Base Class
class Annealer(object):
    __metaclass__ = abc.ABCMeta

    Tmax = 25000.0
    Tmin = 2.5
    steps = 50000
    max_time = 8  # seconds
    updates = 100
    copy_strategy = "deepcopy"
    user_exit = False
    save_state_on_exit = False

    best_state = None
    best_energy = None
    start = None

    def __init__(self, initial_state=None, load_state=None):
        if initial_state is not None:
            self.state = self.copy_state(initial_state)
        elif load_state:
            self.load_state(load_state)
        else:
            raise ValueError("No valid values supplied for neither initial_state nor load_state")

        signal.signal(signal.SIGINT, self.set_user_exit)

    def save_state(self, fname=None):
        if not fname:
            date = datetime.datetime.now().strftime("%Y-%m-%dT%Hh%Mm%Ss")
            fname = date + "_energy_" + str(self.energy()) + ".state"
        with open(fname, "wb") as fh:
            pickle.dump(self.state, fh)

    def load_state(self, fname=None):
        with open(fname, "rb") as fh:
            self.state = pickle.load(fh)

    @abc.abstractmethod
    def move(self):
        pass

    @abc.abstractmethod
    def energy(self):
        pass

    def set_user_exit(self, signum, frame):
        self.user_exit = True

    def set_schedule(self, schedule):
        self.Tmax = schedule["tmax"]
        self.Tmin = schedule["tmin"]
        self.steps = int(schedule["steps"])
        self.updates = int(schedule["updates"])

    def copy_state(self, state):
        if self.copy_strategy == "deepcopy":
            return copy.deepcopy(state)
        elif self.copy_strategy == "slice":
            return state[:]
        elif self.copy_strategy == "method":
            return state.copy()
        else:
            raise RuntimeError("No implementation found for the self.copy_strategy")

    def update(self, *args, **kwargs):
        self.default_update(*args, **kwargs)

    def default_update(self, step, T, E, acceptance, improvement):
        elapsed = time.time() - self.start
        if step == 0:
            print("\n Temperature        Energy    Accept   Improve     Elapsed   Remaining", file=sys.stderr)
            print("\r{Temp:12.5f}  {Energy:12.2f}                      {Elapsed:s}            ".format(
                Temp=T, Energy=E, Elapsed=time_string(elapsed)), file=sys.stderr, end="")
            sys.stderr.flush()
        else:
            remain = (self.steps - step) * (elapsed / step)
            print("\r{Temp:12.5f}  {Energy:12.2f}   {Accept:7.2%}   {Improve:7.2%}  {Elapsed:s}  {Remaining:s}".format(
                Temp=T, Energy=E, Accept=acceptance, Improve=improvement, Elapsed=time_string(elapsed),
                Remaining=time_string(remain)), file=sys.stderr, end="")
            sys.stderr.flush()

    def anneal(self):
        step = 0
        self.start = time.time()

        if self.Tmin <= 0.0:
            raise Exception('Exponential cooling requires a minimum temperature greater than zero.')
        Tfactor = -math.log(self.Tmax / self.Tmin)

        T = self.Tmax
        E = self.energy()
        prevState = self.copy_state(self.state)
        prevEnergy = E
        self.best_state = self.copy_state(self.state)
        self.best_energy = E
        trials = accepts = improves = 0

        if self.updates > 0:
            updateWavelength = self.steps / self.updates
            self.update(step, T, E, None, None)

        while (step < self.steps) and (not self.user_exit) and ((time.time() - self.start) <= self.max_time):
            step += 1
            T = self.Tmax * math.exp(Tfactor * step / self.steps)
            dE = self.move()
            if dE is None:
                E = self.energy()
                dE = E - prevEnergy
            else:
                E += dE
            trials += 1
            if dE > 0.0 and math.exp(-dE / T) < random.random():
                self.state = self.copy_state(prevState)
                E = prevEnergy
            else:
                accepts += 1
                if dE < 0.0:
                    improves += 1
                prevState = self.copy_state(self.state)
                prevEnergy = E
                if E < self.best_energy:
                    self.best_state = self.copy_state(self.state)
                    self.best_energy = E
            if self.updates > 1:
                if (step // updateWavelength) > ((step - 1) // updateWavelength):
                    self.update(step, T, E, accepts / trials, improves / trials)
                    trials = accepts = improves = 0

        self.state = self.copy_state(self.best_state)
        if self.save_state_on_exit:
            self.save_state()

        return self.best_state, self.best_energy

In [11]:
import pickle
from dataclasses import dataclass
from pathlib import Path

import numpy as np

from numpy.typing import NDArray
import polars as pl
from tqdm import tqdm
from typing import Any
import whoosh_utils

In [12]:
#This function selects the top k columns from a matrix X based on the sum of their values. It returns the selected columns and their indices.

def select_top_k_columns(X: Any, k: int) -> tuple[Any, NDArray]:
    row_sums = X.sum(axis=0)
    top_k_indices = np.argsort(-row_sums.A1)[:k]
    X_top = X[:, top_k_indices]
    return X_top, top_k_indices



#This calculates the average precision at 50 (AP@50) for a list of predictions and labels.
def ap50(preds: list[str], labels: list[str]) -> float:
    precisions = list()
    n_found = 0
    for e, i in enumerate(preds):
        if i in labels:
            n_found += 1
        precisions.append(
            n_found / (e + 1)
        )  
    return sum(precisions) / 50

#A dataclass representing a word with a category and content.
@dataclass
class Word:
    category: str
    content: str
    operator: str = "OR"

    def to_str(self):
        return f"{self.category}:{self.content}"

@dataclass
class State:
    words: list[Word]

    def __post_init__(self):
        self.use = np.random.binomial(1, 0.5, len(self.words))
        self.operators = [np.random.choice(["OR", "AND", "NOT"]) for _ in self.words]

    def to_query(self):
        selected_words = [(word, op) for word, use, op in zip(self.words, self.use, self.operators) if use]
        
        if not selected_words:
            return ""

        query_parts = [selected_words[0][0].to_str()]
        for word, op in selected_words[1:]:
            if op == "NOT":
                query_parts.append(f"{op} {word.to_str()}")
            else:
                query_parts.append(f"{op} {word.to_str()}")

        query = " ".join(query_parts)
        
        return query

    def move_1(self):
        """Change whether word is used or not"""
        idx = np.random.choice(len(self.words))
        self.use[idx] = 1 - self.use[idx]
        return self

    def move_2(self):
        """Change operator"""
        idx = np.random.choice(len(self.operators))
        self.operators[idx] = np.random.choice(["OR", "AND", "NOT"])
        return self

#This class extends the Annealer class to solve the USPTO problem. 
#It defines how to move between states and calculate the energy (which is the negative AP@50 score).

class USPTOProblem(Annealer):
    def __init__(
        self,
        qp: Any,
        searcher: Any,
        target: list[str],
        init_state: State,
        tmax: int = 30,
        tmin: int = 10,
        steps: int = 100,
        max_time: int = 8,
        copy_strategy: str = "deepcopy",
    ):
        super(USPTOProblem, self).__init__(init_state)
        self.qp = qp
        self.searcher = searcher
        self.target = target
        self.Tmax = tmax
        self.Tmin = tmin
        self.steps = steps
        self.max_time = max_time
        self.copy_strategy = copy_strategy

    def move(self):
        self.state.move_1()

    def energy(self):
        query = self.state.to_query()
        cand = whoosh_utils.execute_query(query, self.qp, self.searcher)
        ap50_score = ap50(cand, self.target)

        return -ap50_score

In [13]:
comp_data_dir = Path("/kaggle/input/uspto-explainable-ai")
tfidf_dir = Path("/kaggle/input/uspto-ti-cpc-tfidf")

# nearest neighbors and meta
test = pl.read_csv(comp_data_dir / "test.csv")
test_meta = pl.read_parquet("meta_with_text.parquet")

# test index
test_idx = whoosh_utils.load_index("./test_index")
searcher = whoosh_utils.get_searcher(test_idx)
qp = whoosh_utils.get_query_parser()


# for tfidf pickle
def identity(x: Any) -> Any:
    return x

with open(tfidf_dir / "tfidf.pkl", "rb") as f:
    ti_tfidf = pickle.load(f)
with open(tfidf_dir / "cpc_cv_tfidf.pkl", "rb") as f:
    cpc_cv_tfidf = pickle.load(f)

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [14]:
scores = []
results = []

for i in tqdm(range(len(test))):
    target = test[i].to_numpy().flatten()[1:].tolist()
    meta_i = test_meta.filter(pl.col("publication_number").is_in(target))

    if len(meta_i) == 0:
        results.append({"publication_number": test[i, "publication_number"], "query": "ti:device"})
        print("\t Append Dummy", i)
        continue

    # TF-IDF matrix
    ti_mat = ti_tfidf.transform(meta_i.get_column("title").fill_null(""))
    cpc_mat = cpc_cv_tfidf.transform(meta_i.get_column("cpc"))

    # Important topk words
    X_ti, idx = select_top_k_columns(ti_mat, k=10)
    X_cpc, cpc_idx = select_top_k_columns(cpc_mat, k=10)

    # Initialize State with topk words and assign random operators
    topk_words = ti_tfidf.get_feature_names_out()[idx].tolist()
    topk_cpc = cpc_cv_tfidf.get_feature_names_out()[cpc_idx]
    
    topk_words = [Word(category="ti", content=x) for x in topk_words]
    topk_cpc = [Word(category="cpc", content=x) for x in topk_cpc]
    
    words = topk_words + topk_cpc
    state = State(words=words)

    # Determine which words to use using the annealing method
    problem = USPTOProblem(qp, searcher, target, state, steps=1000, max_time=5)
    solution, score = problem.anneal()
    print(f"\t Problem Number {i} Score:", -score)
    scores.append(-score)

    # save publication number and query
    results.append(
        {"publication_number": test[i, "publication_number"], "query": solution.to_query()}
    )

print("Average Score:", sum(scores) / len(scores))


  0%|          | 0/10 [00:00<?, ?it/s]
 Temperature        Energy    Accept   Improve     Elapsed   Remaining
 10%|█         | 1/10 [00:06<00:56,  6.33s/it]

	 Problem Number 0 Score: 0.44



 Temperature        Energy    Accept   Improve     Elapsed   Remaining
 20%|██        | 2/10 [00:11<00:47,  5.92s/it]

	 Problem Number 1 Score: 0.8221393988251116



 Temperature        Energy    Accept   Improve     Elapsed   Remaining
 30%|███       | 3/10 [00:17<00:40,  5.77s/it]

	 Problem Number 2 Score: 0.8390360046457608



 Temperature        Energy    Accept   Improve     Elapsed   Remaining
 40%|████      | 4/10 [00:23<00:34,  5.71s/it]

	 Problem Number 3 Score: 0.7703222428738323
	 Append Dummy 4



 Temperature        Energy    Accept   Improve     Elapsed   Remaining
 60%|██████    | 6/10 [00:28<00:16,  4.17s/it]

	 Problem Number 5 Score: 0.08033333333333333



 Temperature        Energy    Accept   Improve     Elapsed   Remaining
 70%|███████   | 7/10 [00:33<00:13,  4.35s/it]

	 Problem Number 6 Score: 0.04



 Temperature        Energy    Accept   Improve     Elapsed   Remaining
 80%|████████  | 8/10 [00:38<00:09,  4.64s/it]

	 Problem Number 7 Score: 0.18



 Temperature        Energy    Accept   Improve     Elapsed   Remaining
100%|██████████| 10/10 [00:44<00:00,  4.46s/it]

	 Problem Number 8 Score: 0.543721149281371
	 Append Dummy 9
Average Score: 0.46444401611992614





In [15]:
# Remove unwanted files and directories that may cause submission errors
!rm -rf /kaggle/working/*

In [16]:
submission = pl.DataFrame(results)
submission.write_csv("submission.csv")




In [17]:
submission

publication_number,query
str,str
"""US-2017082634-…","""ti:mass OR ti:…"
"""US-2017180470-…","""ti:method OR t…"
"""US-2018029544-…","""ti:module OR t…"
"""US-2022408153-…","""ti:user OR ti:…"
"""US-2268569-A""","""ti:device"""
"""US-3371854-A""","""ti:ion NOT ti:…"
"""US-3589189-A""","""ti:meter AND t…"
"""US-3881203-A""","""ti:holder OR t…"
"""US-4845770-A""","""ti:optical OR …"
"""US-695233-A""","""ti:device"""


In [18]:
submission[1]['query'][0]

'ti:method OR ti:device OR ti:electronic OR cpc:H04L67/42 OR cpc:H04L67/104 OR cpc:H04L67/10'