In [1]:
import json
from langchain_community.graphs.age_graph import AGEGraph
import os
from langchain_postgres import PGVector
from langchain_postgres.vectorstores import PGVector
import psycopg2
import getpass
import os
import itertools


database = {
    "database": "postgres",
    "user": "postgres",
    "password": "password",
    "host": "192.168.1.16",
    "port": "5432"
}

In [2]:
connection = psycopg2.connect(
    dbname=database["database"],
    user=database["user"],
    password=database["password"],
    host=database["host"],
    port=database["port"]
)

In [3]:
from dataclasses import dataclass
from typing import List
@dataclass
class Document:
    id: str
    subject_ids: List[str]
    title: str
    content: str
    language: str

In [5]:
import urllib
from psycopg2.extensions import adapt
import base64
documents = []


for dirs, _, f in os.walk("../llms4subjects/shared-task-datasets/TIBKAT/tib-core-subjects/data"):
    for file in f:
        with open(os.path.join(dirs, file)) as f:
            data = json.load(f)["@graph"]
            subjects = []
            article = None
            id: str = ""
            subject_ids = []
            title: str = ""
            content: str = ""
            language: str = ""
            for d in data:
                if "@type" in d:
                    
                    content = "".join(d["abstract"])
                    id = str(d["@id"])
                    if type(d["title"]) == str:
                        title = d["title"]
                    elif type(d["title"]) == list:
                        title = d["title"][0]
                    if type(d["dcterms:subject"]) == list:
                        for subj in d["dcterms:subject"]:
                            s = subj["@id"]
                            subject_ids.append(s)
                    else:
                        subject_ids.append(d["dcterms:subject"]["@id"])

                    try:
                        if type(d["language"]) == str:
                            language = d["language"].split("/")[-1]
                    except KeyError as k:
                        language = "en" if "/en/" in dirs else "de"

            documents.append(
                Document(
                    id = id,
                    subject_ids = subject_ids,
                    title = urllib.parse.quote(title),
                    content = urllib.parse.quote(content), 
                    language = language
                )
            )
print(len(documents))
print(documents[2])

48882
Document(id='https://www.tib.eu/de/suchen/id/TIBKAT%3A1653124245', subject_ids=['gnd:4022320-6', 'gnd:4007385-3'], title='Aktuelle%20Forschung%20in%20der%20Bodenmechanik%202013%20%3A%20Tagungsband%20zur%201.%20Deutschen%20Bodenmechanik%20Tagung%2C%20Bochum', content='Teil%20I%20Weiche%20B%C3%B6den%20%26%20Konsolidation%3A%20On%20the%20undrained%20analysis%20of%20soft%20anisotropic%20clays%20--%20Modeling%20of%20liquid%20and%20gas%20saturated%20porous%20solids%20under%20freezing%20and%20thawing%20cycles%20--%20Destructuration%20of%20soft%20clay%20during%20Shield%20TBM%20tunnelling%20and%20its%20consequences%20--%20Bodenmechanische%20Besonderheiten%20bei%20Flachgr%C3%BCndungen%20in%20normalkonsolidierten%20weichen%20B%C3%B6den%20-%20Fallbeispiele%20und%20Erkl%C3%A4rungsversuch%20--%20Cyclic%20consolidation%20of%20soft%20soils%20--%20Thermo-osmosis%20effect%20in%20one%20dimensional%20half%20space%20consolidation%20--%20Teil%20II%20Grenzzustand%20der%20Tragf%C3%A4higkeit%3A%20G%C3%BC

In [51]:
import itertools
from tqdm import tqdm

connection = psycopg2.connect(
    dbname=database["database"],
    user=database["user"],
    password=database["password"],
    host=database["host"],
    port=database["port"]
)

curr = connection.cursor()
curr.execute("SET search_path TO ag_catalog;")
curr.execute("CREATE INDEX doc_id ON gnd.\"Document\" USING gin (properties);")
curr.execute("""SELECT * FROM pg_indexes WHERE tablename = '_ag_label_vertex';""")
batch_size = 10
total_batches = (len(documents) + batch_size - 1) // batch_size
tx_commit_interval = 50
for i, doc_batch in enumerate(tqdm(itertools.batched(documents, batch_size), total = total_batches)):
    params = [(doc.id, doc.title, doc.content) for doc in doc_batch]
    try:
        curr.executemany(
            """
            SELECT * FROM cypher('gnd', $$ 
                MATCH (d:Document {id: %s})
                SET d.title = %s , d.content = %s
                RETURN d
            $$) AS (result agtype);
            """,
            params
        )
    except Exception as e:
        connection.rollback()
        connection.close()

    if i % tx_commit_interval == 0:
        connection.commit()

connection.commit()
connection.close()


100%|██████████| 4889/4889 [06:03<00:00, 13.46it/s]


In [64]:
connection = psycopg2.connect(
    dbname=database["database"],
    user=database["user"],
    password=database["password"],
    host=database["host"],
    port=database["port"]
)

curr = connection.cursor()
try:
    curr.execute("DROP INDEX gnd.\"doc_id2\"")
    connection.commit()
except Exception as e:
    print(e)
    connection.rollback()
finally:
    connection.close()

In [7]:
import itertools
from tqdm import tqdm

connection = psycopg2.connect(
    dbname=database["database"],
    user=database["user"],
    password=database["password"],
    host=database["host"],
    port=database["port"]
)

curr = connection.cursor()
curr.execute("SET search_path TO ag_catalog;")
curr.execute("DROP INDEX IF EXISTS gnd.\"doc_id\"")

curr.execute("CREATE INDEX doc_id ON gnd.\"Document\" USING gin (properties);")
curr.execute("""SELECT * FROM pg_indexes WHERE tablename = '_ag_label_vertex';""")
batch_size = 10
total_batches = (len(documents) + batch_size - 1) // batch_size
tx_commit_interval = 50

for i, doc in enumerate(tqdm(documents)):
    if len(doc.subject_ids) > 1:
        continue
    for subj_id in doc.subject_ids:
        try:
            curr.execute(f"""
                SELECT * FROM cypher('gnd', $$ 
                    MATCH (d:Document {{id: '{doc.id}'}}), (s:Subject {{code: '{subj_id}'}})
                    MERGE (d)-[:DOC_SUBJECT]->(s)
                    RETURN d
                $$) AS (result agtype);
            """)
        except Exception as e:
            curr.execute("DROP INDEX IF EXISTS gnd.\"doc_id\"")
            connection.rollback()
            connection.close()

    if i % tx_commit_interval == 0:
        connection.commit()


curr.execute("DROP INDEX IF EXISTS gnd.\"doc_id\"")
connection.commit()
connection.close()


100%|██████████| 48882/48882 [20:16<00:00, 40.17it/s] 


In [74]:
curr.execute("DROP INDEX IF EXISTS gnd.\"doc_id\"")
connection.commit()
connection.close()

InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block
