In [1]:
!pip install /kaggle/input/whoosh-wheel-2-7-4/Whoosh-2.7.4-py2.py3-none-any.whl

Processing /kaggle/input/whoosh-wheel-2-7-4/Whoosh-2.7.4-py2.py3-none-any.whl
Installing collected packages: Whoosh
Successfully installed Whoosh-2.7.4


Based on https://www.kaggle.com/code/tubotubo/uspto-simulated-annealing-baseline

# Create Test Index

In [2]:
%%python
from pathlib import Path

import polars as pl
from tqdm import tqdm

import whoosh_utils

comp_data_dir = Path("/kaggle/input/uspto-explainable-ai")

# Read patent since 1975
meta = pl.scan_parquet(comp_data_dir / "patent_metadata.parquet")
meta = (
    meta.with_columns(
        pl.col("publication_date").dt.year().alias("year"),
        pl.col("publication_date").dt.month().alias("month"),
    )
     #.filter(pl.col("publication_date") >= pl.date(1975, 1, 1))
    .rename({"cpc_codes": "cpc"})
    .collect()
)
test_nn = pl.scan_csv(comp_data_dir / "test.csv")
# test_nn = pl.read_csv(comp_data_dir / "nearest_neighbors.csv").sample(2500).lazy()

# Filtering only the patent meta-information that appears in the test
all_pub = test_nn.melt().collect().get_column("value").unique()
meta = meta.filter(pl.col("publication_number").is_in(all_pub))

# Join meta information
patents = []
n_unique = meta.select(["year", "month"]).n_unique()
for (year, month), _ in tqdm(meta.group_by(["year", "month"]), total=n_unique):
    patent_path = comp_data_dir / f"patent_data/{year}_{month}.parquet"
    patent = pl.scan_parquet(patent_path).select(pl.exclude(["claims", "description"]))
    patents.append(patent)
patent: pl.LazyFrame = pl.concat(patents)
patent = patent.with_columns(
    pl.lit("").alias("claims"),
    pl.lit("").alias("description"),
)
meta_with_text = (
    meta.lazy().join(patent, on="publication_number", how="left").collect(streaming=True)
)
meta_with_text.write_parquet("meta_with_text.parquet")

# create index
documents = meta_with_text.to_dicts()
Path("test_index").mkdir(parents=True, exist_ok=True)
whoosh_utils.create_index("test_index", documents)

Processing /kaggle/input/whoosh-wheel-2-7-4/Whoosh-2.7.4-py2.py3-none-any.whl
Whoosh is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


100%|██████████| 365/365 [00:03<00:00, 93.22it/s]


# Annealing

Optimize the query using the annealing method.

For simplicity, we will create a query using only “OR”.
This means that we will create a query “word1 OR word2 OR word3 OR ...”.

We will use an annealing method to determine which words to use.
The specific steps are as follows

1. select topk words with high TFIDF values as candidates
2. select the word with the maximum AP@50 using the annealing method
3. Combine the selected words with “OR” to form a query

In [3]:
# https://github.com/perrygeo/simanneal/blob/master/simanneal/anneal.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import abc
import copy
import datetime
import math
import pickle
import random
import signal
import sys
import time


def round_figures(x, n):
    """Returns x rounded to n significant figures."""
    return round(x, int(n - math.ceil(math.log10(abs(x)))))


def time_string(seconds):
    """Returns time in seconds as a string formatted HHHH:MM:SS."""
    s = int(round(seconds))  # round to nearest second
    h, s = divmod(s, 3600)  # get hours and remainder
    m, s = divmod(s, 60)  # split remainder into minutes and seconds
    return "%4i:%02i:%02i" % (h, m, s)


class Annealer(object):
    """Performs simulated annealing by calling functions to calculate
    energy and make moves on a state.  The temperature schedule for
    annealing may be provided manually or estimated automatically.
    """

    __metaclass__ = abc.ABCMeta

    # defaults
    Tmax = 25000.0
    Tmin = 2.5
    steps = 50000
    max_time = 8  # seconds
    updates = 100
    copy_strategy = "deepcopy"
    user_exit = False
    save_state_on_exit = False

    # placeholders
    best_state = None
    best_energy = None
    start = None

    def __init__(self, initial_state=None, load_state=None):
        if initial_state is not None:
            self.state = self.copy_state(initial_state)
        elif load_state:
            self.load_state(load_state)
        else:
            raise ValueError(
                "No valid values supplied for neither \
            initial_state nor load_state"
            )

        signal.signal(signal.SIGINT, self.set_user_exit)

    def save_state(self, fname=None):
        """Saves state to pickle"""
        if not fname:
            date = datetime.datetime.now().strftime("%Y-%m-%dT%Hh%Mm%Ss")
            fname = date + "_energy_" + str(self.energy()) + ".state"
        with open(fname, "wb") as fh:
            pickle.dump(self.state, fh)

    def load_state(self, fname=None):
        """Loads state from pickle"""
        with open(fname, "rb") as fh:
            self.state = pickle.load(fh)

    @abc.abstractmethod
    def move(self):
        """Create a state change"""
        pass

    @abc.abstractmethod
    def energy(self):
        """Calculate state's energy"""
        pass

    def set_user_exit(self, signum, frame):
        """Raises the user_exit flag, further iterations are stopped"""
        self.user_exit = True

    def set_schedule(self, schedule):
        """Takes the output from `auto` and sets the attributes"""
        self.Tmax = schedule["tmax"]
        self.Tmin = schedule["tmin"]
        self.steps = int(schedule["steps"])
        self.updates = int(schedule["updates"])

    def copy_state(self, state):
        """Returns an exact copy of the provided state
        Implemented according to self.copy_strategy, one of

        * deepcopy: use copy.deepcopy (slow but reliable)
        * slice: use list slices (faster but only works if state is list-like)
        * method: use the state's copy() method
        """
        if self.copy_strategy == "deepcopy":
            return copy.deepcopy(state)
        elif self.copy_strategy == "slice":
            return state[:]
        elif self.copy_strategy == "method":
            return state.copy()
        else:
            raise RuntimeError(
                "No implementation found for " + 'the self.copy_strategy "%s"' % self.copy_strategy
            )

    def update(self, *args, **kwargs):
        """Wrapper for internal update.

        If you override the self.update method,
        you can chose to call the self.default_update method
        from your own Annealer.
        """
        self.default_update(*args, **kwargs)

    def default_update(self, step, T, E, acceptance, improvement):
        """Default update, outputs to stderr.

        Prints the current temperature, energy, acceptance rate,
        improvement rate, elapsed time, and remaining time.

        The acceptance rate indicates the percentage of moves since the last
        update that were accepted by the Metropolis algorithm.  It includes
        moves that decreased the energy, moves that left the energy
        unchanged, and moves that increased the energy yet were reached by
        thermal excitation.

        The improvement rate indicates the percentage of moves since the
        last update that strictly decreased the energy.  At high
        temperatures it will include both moves that improved the overall
        state and moves that simply undid previously accepted moves that
        increased the energy by thermal excititation.  At low temperatures
        it will tend toward zero as the moves that can decrease the energy
        are exhausted and moves that would increase the energy are no longer
        thermally accessible."""

        elapsed = time.time() - self.start
        if step == 0:
            print(
                "\n Temperature        Energy    Accept   Improve     Elapsed   Remaining",
                file=sys.stderr,
            )
            print(
                "\r{Temp:12.5f}  {Energy:12.2f}                      {Elapsed:s}            ".format(
                    Temp=T, Energy=E, Elapsed=time_string(elapsed)
                ),
                file=sys.stderr,
                end="",
            )
            sys.stderr.flush()
        else:
            remain = (self.steps - step) * (elapsed / step)
            print(
                "\r{Temp:12.5f}  {Energy:12.2f}   {Accept:7.2%}   {Improve:7.2%}  {Elapsed:s}  {Remaining:s}".format(
                    Temp=T,
                    Energy=E,
                    Accept=acceptance,
                    Improve=improvement,
                    Elapsed=time_string(elapsed),
                    Remaining=time_string(remain),
                ),
                file=sys.stderr,
                end="",
            )
            sys.stderr.flush()

    def anneal(self):
        """Minimizes the energy of a system by simulated annealing.

        Parameters
        state : an initial arrangement of the system

        Returns
        (state, energy): the best state and energy found.
        """
        step = 0
        self.start = time.time()

        # Precompute factor for exponential cooling from Tmax to Tmin
        if self.Tmin <= 0.0:
            raise Exception(
                'Exponential cooling requires a minimum "\
                "temperature greater than zero.'
            )
        Tfactor = -math.log(self.Tmax / self.Tmin)

        # Note initial state
        T = self.Tmax
        E = self.energy()
        prevState = self.copy_state(self.state)
        prevEnergy = E
        self.best_state = self.copy_state(self.state)
        self.best_energy = E
        trials = accepts = improves = 0
        if self.updates > 0:
            updateWavelength = self.steps / self.updates
            self.update(step, T, E, None, None)

        # Attempt moves to new states
        while (
            (step < self.steps)
            and (not self.user_exit)
            and ((time.time() - self.start) <= self.max_time)
        ):
            step += 1
            T = self.Tmax * math.exp(Tfactor * step / self.steps)
            dE = self.move()
            if dE is None:
                E = self.energy()
                dE = E - prevEnergy
            else:
                E += dE
            trials += 1
            if dE > 0.0 and math.exp(-dE / T) < random.random():
                # Restore previous state
                self.state = self.copy_state(prevState)
                E = prevEnergy
            else:
                # Accept new state and compare to best state
                accepts += 1
                if dE < 0.0:
                    improves += 1
                prevState = self.copy_state(self.state)
                prevEnergy = E
                if E < self.best_energy:
                    self.best_state = self.copy_state(self.state)
                    self.best_energy = E
            if self.updates > 1:
                if (step // updateWavelength) > ((step - 1) // updateWavelength):
                    self.update(step, T, E, accepts / trials, improves / trials)
                    trials = accepts = improves = 0

        self.state = self.copy_state(self.best_state)
        if self.save_state_on_exit:
            self.save_state()

        # Return best state and energy
        return self.best_state, self.best_energy

In [4]:
import pickle
from dataclasses import dataclass
from pathlib import Path

import numpy as np
from numpy.typing import NDArray
import polars as pl
from tqdm import tqdm
from typing import Any
import whoosh_utils

Processing /kaggle/input/whoosh-wheel-2-7-4/Whoosh-2.7.4-py2.py3-none-any.whl
Whoosh is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [5]:
def select_top_k_columns(X: Any, k: int) -> tuple[Any, NDArray]:
    # 行方向の和を計算
    row_sums = X.sum(axis=0)
    
    #valid_indices = np.where(row_sums.A1 >= 0)[0]

    # 和が大きい上位k個の列のインデックスを取得
    top_k_indices = np.argsort(-row_sums.A1)[:k]
    #top_k_indices = np.intersect1d(top_k_indices, valid_indices)
    
    #top_k_value = row_sums.A1[top_k_indices]
    #print(top_k_indices,top_k_value)

    # 上位k個の列を選択
    X_top = X[:, top_k_indices]

    return X_top, top_k_indices


# https://www.kaggle.com/competitions/uspto-explainable-ai/discussion/499981#2791642
def ap50(preds: list[str], labels: list[str]) -> float:
    precisions = list()
    n_found = 0
    for e, i in enumerate(preds):
        if i in labels:
            n_found += 1
        precisions.append(
            n_found / (e + 1)
        )  # this is the line that is probably incorrect for competition
    return sum(precisions) / 50


@dataclass
class Word:
    category: str
    content: str

    def to_str(self):
        return f"{self.category}:{self.content}"


@dataclass
class State:
    words: list[Word]

    def __post_init__(self):
        self.use = np.random.binomial(1, 0.5, len(self.words))

    def to_query(self):
        words = [word.to_str() for word, use in zip(self.words, self.use) if use]

        return " OR ".join(words)

    def move_1(self):
        """Change whether word is used or not"""
        idx = np.random.choice(len(self.words))
        self.use[idx] = 1 - self.use[idx]
        return self


class USPTOProblem(Annealer):
    def __init__(
        self,
        qp: Any,
        searcher: Any,
        target: list[str],
        init_state: State,
        tmax: int = 30,
        tmin: int = 10,
        steps: int = 100,
        max_time: int = 8,
        copy_strategy: str = "deepcopy",
    ):
        super(USPTOProblem, self).__init__(init_state)
        self.qp = qp
        self.searcher = searcher
        self.target = target
        self.Tmax = tmax
        self.Tmin = tmin
        self.steps = steps
        self.max_time = max_time
        self.copy_strategy = copy_strategy

    def move(self):
        self.state.move_1()

    def energy(self):
        query = self.state.to_query()
        cand = whoosh_utils.execute_query(query, self.qp, self.searcher)
        ap50_score = ap50(cand, self.target)

        return -ap50_score

In [6]:
comp_data_dir = Path("/kaggle/input/uspto-explainable-ai")
tfidf_dir = Path("/kaggle/input/uspto-ti-cpc-tfidf")

# nearest neighbors and meta
test = pl.read_csv(comp_data_dir / "test.csv")
test_meta = pl.read_parquet("meta_with_text.parquet")

# test index
test_idx = whoosh_utils.load_index("./test_index")
searcher = whoosh_utils.get_searcher(test_idx)
qp = whoosh_utils.get_query_parser()


# for tfidf pickle
def identity(x: Any) -> Any:
    return x


with open(tfidf_dir / "tfidf.pkl", "rb") as f:
    ti_tfidf = pickle.load(f)
with open(tfidf_dir / "cpc_cv_tfidf.pkl", "rb") as f:
    cpc_cv_tfidf = pickle.load(f)

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [7]:
scores = []
results = []

for i in tqdm(range(len(test))):
    target = test[i].to_numpy().flatten()[1:].tolist()
    file = test[i].to_numpy().flatten().tolist()
    meta_i = test_meta.filter(pl.col("publication_number").is_in(file))

    if len(meta_i) == 0:
        # append dummy
        results.append({"publication_number": test[i, "publication_number"], "query": "ti:device"})
        print("\t Append Dummy", i)
        continue

    # TF-IDF matrix
    ti_mat = ti_tfidf.transform(meta_i.get_column("title").fill_null(""))
    cpc_mat = cpc_cv_tfidf.transform(meta_i.get_column("cpc"))

    # Important topk words
    X_ti, idx = select_top_k_columns(ti_mat, k=80)
    X_cpc, cpc_idx = select_top_k_columns(cpc_mat, k=80)

    # Initialize State with topk words
    # The initial query is "word1 OR word2 OR word3 OR ...""
    topk_words = ti_tfidf.get_feature_names_out()[idx].tolist()
    topk_cpc = cpc_cv_tfidf.get_feature_names_out()[cpc_idx]
    #topk_words = [Word(category="ti", content=x) for x in topk_words]
    #topk_cpc = [Word(category="cpc", content=x) for x in topk_cpc]
    #words = topk_words + topk_cpc
    #state = State(words=words)

    # Determine which words to use using the annealing method
    # 2500 * 5 / 60 / 60 = 3.47 [hour]
    #problem = USPTOProblem(qp, searcher, target, state, steps=1000, max_time=10)
    #solution, score = problem.anneal()
    
    topk_words = [f"ti:{x}" for x in topk_words]
    topk_cpc = [f"cpc:{x}" for x in topk_cpc]
    
    query_score_pairs = []  
    for query in topk_words + topk_cpc:  
        cand = whoosh_utils.execute_query(query, qp, searcher)  
        ap50_score = ap50(cand, target)  
        # 将查询和分数作为一个元组添加到列表中  
        if ap50_score>0:
            query_score_pairs.append((query, ap50_score))
        
    # 根据分数（元组的第二个元素）对列表进行排序  
    sorted_query_score_pairs = sorted(query_score_pairs, key=lambda x: x[1], reverse=True)  # reverse=True表示按降序排序  

    # 如果你只需要排序后的查询列表  
    sorted_queries = [pair[0] for pair in sorted_query_score_pairs] 
    
    check_w = []
    score = 0
    for s in sorted_queries:
        check_w.append(s)
        query = " OR ".join(check_w)
        
        cand = whoosh_utils.execute_query(query, qp, searcher)  
        ap50_score = ap50(cand, target)   
        
        if whoosh_utils.count_query_tokens(query) >=50:
            check_w.remove(s)
            break
            
        if ap50_score<score:
            check_w.remove(s)
        else:
            score = ap50_score
            
    word = check_w
    k = 0
    #j = 0
    for j in range(len(word)):
        word_now = word.copy()
        word_now.pop(k)
        query = " OR ".join(word_now)
        cand = whoosh_utils.execute_query(query, qp, searcher)
        ap50_score = ap50(cand, target)

        if ap50_score>=score:
            score = ap50_score
            word = word_now.copy()
        else:
            k+=1
        #j+=1  
        
    
    print(f"\t Problem Number {i} Score:", score)
    scores.append(score)

    # save publication number and query
    results.append(
        {"publication_number": test[i, "publication_number"], "query": " OR ".join(word)}
    )


print("Average Score:", sum(scores) / len(scores))

 10%|█         | 1/10 [00:02<00:21,  2.38s/it]

	 Problem Number 0 Score: 0.9987918367346938


 20%|██        | 2/10 [00:03<00:12,  1.62s/it]

	 Problem Number 1 Score: 0.9812534726942382


 30%|███       | 3/10 [00:04<00:09,  1.37s/it]

	 Problem Number 2 Score: 0.9983751700680271


 40%|████      | 4/10 [00:05<00:07,  1.30s/it]

	 Problem Number 3 Score: 0.9796223024574935


 50%|█████     | 5/10 [00:06<00:06,  1.22s/it]

	 Problem Number 4 Score: 0.9813074936000219


 60%|██████    | 6/10 [00:07<00:04,  1.16s/it]

	 Problem Number 5 Score: 0.9925409403020394


 70%|███████   | 7/10 [00:08<00:03,  1.13s/it]

	 Problem Number 6 Score: 0.9987918367346938


 80%|████████  | 8/10 [00:09<00:02,  1.10s/it]

	 Problem Number 7 Score: 0.965436808364758


 90%|█████████ | 9/10 [00:11<00:01,  1.08s/it]

	 Problem Number 8 Score: 0.9908473661457406


100%|██████████| 10/10 [00:12<00:00,  1.20s/it]

	 Problem Number 9 Score: 0.9923800930423198
Average Score: 0.9879347320144026





In [8]:
# Remove unwanted files and directories that may cause submission errors
!rm -rf /kaggle/working/*

In [9]:
submission = pl.DataFrame(results)
submission.write_csv("submission.csv")

submission

publication_number,query
str,str
"""US-2017082634-…","""cpc:G01N33/684…"
"""US-2017180470-…","""ti:method OR c…"
"""US-2018029544-…","""ti:solar OR ti…"
"""US-2022408153-…","""ti:apparatus O…"
"""US-2268569-A""","""ti:machine OR …"
"""US-3371854-A""","""ti:pump OR cpc…"
"""US-3589189-A""","""ti:meter OR ti…"
"""US-3881203-A""","""ti:holder OR t…"
"""US-4845770-A""","""ti:reader OR t…"
"""US-695233-A""","""cpc:C10H15/06 …"
