# Pydantic -> {Guidance}/LLM -> SQLight

### COMMENTS:

* possible improvments:
    * bridge Pydantic and SQLite with for example pydantic_sqlite module https://pypi.org/project/pydantic_sqlite/#description
    * try/except; error handling
---

## POC extraction of metadata and isertion into a SQLite table for 1 document

create database and table

In [None]:
import sqlite3

conn = sqlite3.connect('patent_metadata.db')
cur = conn.cursor()

In [None]:
cur.execute("""CREATE TABLE IF NOT EXISTS patent_metadata(
   patentid TEXT PRIMARY KEY,
   timestamp REAL,
   summary TEXT,
   key_technological_field TEXT,
   novelty_level TEXT,
   novelty_level_reason TEXT,
   medical_device_category TEXT,
   invasive TEXT,
   active TEXT,
   software TEXT,
   measuring_function TEXT,
   trained_professional_user TEXT);
""")
conn.commit()

read in document and load model

In [None]:
from pypdf import PdfReader 
  
reader = PdfReader('/home/dorota/LLM-diploma-project/00_concept_tests/data/patents/WO2014076653A1.pdf') 
num_pages = len(reader.pages)
TEXT = ""
for page_num in range(num_pages): #change to range(num_pages) for whole document
    page = reader.pages[page_num]  
    TEXT += page.extract_text()

In [None]:
import guidance
from guidance import models, gen, select
import json
from llama_cpp import Llama

model = Llama("/home/dorota/models/mistral-7b-instruct-v0.2.Q6_K.gguf", n_gpu_layers=10, n_ctx=0, echo=False, verbose=False)
# model = Llama("/home/dorota/models/mixtral-8x7b-instruct-v0.1.Q4_K_M.gguf", n_gpu_layers=5, n_ctx=0, echo=False, verbose=True)
lm = guidance.models.LlamaCpp(model=model, echo=False, silent=False)

define Pydantic class

In [None]:
from pydantic import BaseModel, Field
from typing import List

class PatentMetadata(BaseModel):
    summary: str = Field(..., description="generate summary of the invention in 1 sentence")
    key_technological_field: str = Field(..., description="list 5 key technological concepts in 1-3 words described in patent")
    novelty_level: str = Field(..., description="select one value from provided examples to define level of novelty of invention", examples=['LOW', 'MEDIUM', 'HIGH'])
    novelty_level_reason: str = Field(..., description="describe reason for chosen novelty_level in 1 sentece")
    medical_device_category: str = Field(...,
                                         description="choose device category from provided examples",
                                         examples=['Clinical chemistry and clinical toxicology devices',
                                                   'Hematology and pathology devices',
                                                   'Immunology and microbiology devices',
                                                   'Anesthesiology devices',
                                                   'Cardiovascular devices',
                                                   'Dental devices',
                                                   'Gastroenterology-urology devices',
                                                   'General and plastic surgery devices',
                                                   'General hospital and personal use devices',
                                                   'Neurological devices',
                                                   'Obstetrical and gynecological devices',
                                                   'Ophthalmic devices',
                                                   'Orthopedic devices',
                                                   'Physical medicine devices',
                                                   'Radiology devices'])
    invasive: str = Field(...,
                          description="Is the device'invasive', where 'invasive' means that any part of device penetrates inside the body?",
                          examples=['yes', 'no'])
    active: str = Field(...,
                        description="Is the device 'active' meaning device contains software or device operation depends on a source of energy other than that generated by the human body for that purpose, or by gravity, and which acts by changing the density of or converting that energy?",
                        examples=['yes', 'no'])
    software: str = Field(...,
                          description="Does the device contain software or is connected to a device with software?",
                          examples=['yes', 'no'])
    measuring_function: str = Field(...,
                                    description="Does the device have a 'measuring function' meaning it is intended to quantify parameters and the result of the measurement in displayed?",
                                    examples=['yes', 'no'])
    trained_professional_user: str = Field(...,
                               description="Does the device require trained professional medical personel to be used?",
                               examples=['yes', 'no'])

build {Guidance} prompt and run the model on the text document

In [None]:
@guidance
def get_metadata(lm, text, pydantic_class):
    lm += f'''\
    Patent is delimited by (start) and (stop):
    (start)
    {text}
    (stop)

    JSON ouput: {{'''

    for key, value in pydantic_class.model_fields.items():
        if value.examples:
            lm += '"' + value.description + " " + ', '.join(value.examples) + '": "' + select(options=value.examples, name=key) + '",'
        elif value.annotation == str:
            lm += '"' + value.description + '": "' + gen(name=key, stop='"') + '",'
        elif value.annotation == int:
            lm += '"' + value.description + '": "' + gen(name=key, regex="[0-9]+") + '",'
    lm += '}'
    return lm

def output_to_pydantic(pydantic_class, output):
    '''Transorms output from lm/guidance -> dict -> original pydantic model'''
    metadata_dict = {}
    for key in pydantic_class.model_fields.keys():
        metadata_dict[key] = output[key]

    output_pydantic_model = pydantic_class(**metadata_dict)
    return output_pydantic_model


output = lm + get_metadata(TEXT, PatentMetadata)
output_pydantic = output_to_pydantic(PatentMetadata, output)

create tuple with output fields

In [None]:
from datetime import datetime

timestamp = datetime.timestamp(datetime.now())

filename = 'WO2014076653A1.pdf'
patentid = filename.split('.')[0]

output_vals = (patentid, timestamp, *output_pydantic.model_dump().values())
output_vals

insert tuple into table (note update fields if timestamp for incomming information is newer than exisiting timestamp) - corresponds to UPSERT

In [None]:
cur.execute("""INSERT INTO patent_metadata VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(patentid) DO UPDATE SET
                summary=excluded.summary,
                timestamp=excluded.timestamp,
                key_technological_field=excluded.key_technological_field,
                novelty_level=excluded.novelty_level,
                novelty_level_reason=excluded.novelty_level_reason,
                medical_device_category=excluded.medical_device_category,
                invasive=excluded.invasive,
                active=excluded.active,
                software=excluded.software,
                measuring_function=excluded.measuring_function,
                trained_professional_user=excluded.trained_professional_user
            WHERE excluded.timestamp>patent_metadata.timestamp;""", output_vals)
conn.commit()

# https://www.sqlite.org/lang_upsert.html

display table using pandas

In [None]:
import pandas as pd

df = pd.read_sql_query("SELECT * from patent_metadata", con=conn)
df

In [None]:
# cur.execute("DROP TABLE patent_metadata")
# conn.commit()

---

## Full code for extraction of metadata from files in a specified folder

In [None]:
# run all patents in specified folder
import sqlite3

conn = sqlite3.connect('patent_metadata.db')
cur = conn.cursor()

import os
from pypdf import PdfReader
from datetime import datetime

import guidance
from guidance import models, gen, select
import json
from llama_cpp import Llama

from pydantic import BaseModel, Field
from typing import List

cur.execute("""CREATE TABLE IF NOT EXISTS patent_metadata(
   patentid TEXT PRIMARY KEY,
   timestamp REAL,
   summary TEXT,
   key_technological_field TEXT,
   novelty_level TEXT,
   novelty_level_reason TEXT,
   medical_device_category TEXT,
   invasive TEXT,
   active TEXT,
   software TEXT,
   measuring_function TEXT,
   trained_professional_user TEXT);
""")
conn.commit()

model = Llama("/home/dorota/models/mistral-7b-instruct-v0.2.Q6_K.gguf", n_gpu_layers=10, n_ctx=0, echo=False, verbose=False)
# model = Llama("/home/dorota/models/mixtral-8x7b-instruct-v0.1.Q4_K_M.gguf", n_gpu_layers=5, n_ctx=0, echo=False, verbose=True)
lm = guidance.models.LlamaCpp(model=model, echo=False, silent=False)

directory = '/home/dorota/LLM-diploma-project/00_concept_tests/data/patents_2'

class PatentMetadata(BaseModel):
    summary: str = Field(..., description="generate summary of the invention in 1 sentence")
    key_technological_field: str = Field(..., description="list 5 key technological concepts in 1-3 words described in patent")
    novelty_level: str = Field(..., description="select one value from provided examples to define level of novelty of invention", examples=['LOW', 'MEDIUM', 'HIGH'])
    novelty_level_reason: str = Field(..., description="describe reason for chosen novelty_level in 1 sentece")
    medical_device_category: str = Field(...,
                                         description="choose device category from provided examples",
                                         examples=['Clinical chemistry and clinical toxicology devices',
                                                   'Hematology and pathology devices',
                                                   'Immunology and microbiology devices',
                                                   'Anesthesiology devices',
                                                   'Cardiovascular devices',
                                                   'Dental devices',
                                                   'Gastroenterology-urology devices',
                                                   'General and plastic surgery devices',
                                                   'General hospital and personal use devices',
                                                   'Neurological devices',
                                                   'Obstetrical and gynecological devices',
                                                   'Ophthalmic devices',
                                                   'Orthopedic devices',
                                                   'Physical medicine devices',
                                                   'Radiology devices'])
    invasive: str = Field(...,
                          description="Is the device'invasive', where 'invasive' means that any part of device penetrates inside the body?",
                          examples=['yes', 'no'])
    active: str = Field(...,
                        description="Is the device 'active' meaning device contains software or device operation depends on a source of energy other than that generated by the human body for that purpose, or by gravity, and which acts by changing the density of or converting that energy?",
                        examples=['yes', 'no'])
    software: str = Field(...,
                          description="Does the device contain software or is connected to a device with software?",
                          examples=['yes', 'no'])
    measuring_function: str = Field(...,
                                    description="Does the device have a 'measuring function' meaning it is intended to quantify parameters and the result of the measurement in displayed?",
                                    examples=['yes', 'no'])
    trained_professional_user: str = Field(...,
                               description="Does the device require trained professional medical personel to be used?",
                               examples=['yes', 'no'])

@guidance
def get_metadata(lm, text, pydantic_class):
    lm += f'''\
    Patent is delimited by (start) and (stop):
    (start)
    {text}
    (stop)

    JSON ouput: {{'''

    for key, value in pydantic_class.model_fields.items():
        if value.examples:
            lm += '"' + value.description + " " + ', '.join(value.examples) + '": "' + select(options=value.examples, name=key) + '",'
        elif value.annotation == str:
            lm += '"' + value.description + '": "' + gen(name=key, stop='"') + '",'
        elif value.annotation == int:
            lm += '"' + value.description + '": "' + gen(name=key, regex="[0-9]+") + '",'
    lm += '}'
    return lm

def output_to_pydantic(pydantic_class, output):
    '''Transorms output from lm/guidance -> dict -> original pydantic model'''
    metadata_dict = {}
    for key in pydantic_class.model_fields.keys():
        metadata_dict[key] = output[key]

    output_pydantic_model = pydantic_class(**metadata_dict)
    return output_pydantic_model



for filename in os.listdir(directory):
    print(f'extracting metadata from {filename}')
    timestamp = datetime.timestamp(datetime.now())
    patentid = filename.split('.')[0]

    cur.execute(f"SELECT 1 FROM patent_metadata WHERE patentid = ?", (patentid,))
    exists = cur.fetchone()

    if exists:
        print(f'{patentid} already exists')
        continue
    else:
        file = os.path.join(directory, filename)
        reader = PdfReader(file) 
        num_pages = len(reader.pages)
        TEXT = ""
        for page_num in range(num_pages):
            page = reader.pages[page_num]  
            TEXT += page.extract_text()

        output = lm + get_metadata(TEXT, PatentMetadata)
        output_pydantic = output_to_pydantic(PatentMetadata, output)
        output_vals = (patentid, timestamp, *output_pydantic.model_dump().values())
        cur.execute("""INSERT INTO patent_metadata VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT(patentid) DO UPDATE SET
                    summary=excluded.summary,
                    timestamp=excluded.timestamp,
                    key_technological_field=excluded.key_technological_field,
                    novelty_level=excluded.novelty_level,
                    novelty_level_reason=excluded.novelty_level_reason,
                    medical_device_category=excluded.medical_device_category,
                    invasive=excluded.invasive,
                    active=excluded.active,
                    software=excluded.software,
                    measuring_function=excluded.measuring_function,
                    trained_professional_user=excluded.trained_professional_user
                WHERE excluded.timestamp>patent_metadata.timestamp;""", output_vals)
        conn.commit()
        # conn.close() ???????????????????????????????????????????
        print(f'done with {filename}')
    