In [1]:
import threading
import queue
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from pprint import pprint

import numpy as np
from sentence_transformers import SentenceTransformer
from src.gen.util import read_gzip_data, write_gzip_data

datap = Path("/users/k21190024/study/fact-check-transfer-learning/scratch/dumps/data/level/1")
dumpp = Path("/users/k21190024/study/fact-check-transfer-learning/scratch/dumps/explore/level/2")
if not dumpp.exists():
    dumpp.mkdir(parents=True)
model_encodep = dumpp.joinpath("transformers", "encoded")
if not model_encodep.exists():
    model_encodep.mkdir(parents=True)
fcorpp = model_encodep.parent.joinpath("fever-corpus-flatten")
if not fcorpp.exists():
    fcorpp.mkdir()

# Dataset Similarity

In [2]:
q_writer = queue.SimpleQueue()
q_encode = queue.SimpleQueue()
def consumer_writer():
    while True:
        fp, payload, outp = q_writer.get()
        if fp is None:
            break
        inp = write_gzip_data(fp, payload)
        q_encode.put((inp, outp))

model = SentenceTransformer("all-mpnet-base-v2")
def consumer_encode():
    while True:
        inp, outp = q_encode.get()
        if inp is None:
            break
        inp = read_gzip_data(inp)
        embeddings = {}
        for k, d in inp.items():
            embeddings[k] = model.encode(d, convert_to_tensor=True)
        print(write_gzip_data(outp, embeddings))

t_writer = threading.Thread(target=consumer_writer, daemon=True)
t_encode = threading.Thread(target=consumer_encode, daemon=True)
t_writer.start()
t_encode.start()

## Transformer

### FEVER

In [3]:
def flatten_fever_sentences(fp):
    doc = read_gzip_data(fp)
    dl = {}
    for k, d in doc.items():
        dl[k] = []
        for l in d["lines"]:
            if len(l) >= 2:
                dl[k].append(l[1])
    return write_gzip_data(fcorpp.joinpath(f"{fp.name.split('.')[0].replace('-', '_')}.pkl.gz"), dl)

In [4]:
# f_claims = {i["id"]: i["claim"] for i in read_gzip_data(datap.joinpath("fever", "fulltrain.json.gz"))}
# q_writer.put((
#     model_encodep.parent.joinpath("fever-claims-flat.pkl.gz")
#     , f_claims
#     , model_encodep.joinpath("fever-claims-embedding.pkl.gz")
# ))

# concurrent futures submit all fever corpus
pool = ProcessPoolExecutor(max_workers=5)
futures = []
j = 0
for i in datap.joinpath("fever", "corpus").iterdir():
    if j == 3:
        break
    futures.append(pool.submit(flatten_fever_sentences, i))
    j += 1

In [5]:
for future in as_completed(futures):
    finp = Path(future.result())
    q_encode.put((
        finp,
        model_encodep.joinpath(f"fever-evidences-{finp.name.split('.')[0]}.pkl.gz")
    ))

In [6]:
pool.shutdown()

Exception in thread Thread-6:
Traceback (most recent call last):
  File "/scratch/users/k21190024/envs/conda/p-dis-torch/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/scratch/users/k21190024/envs/conda/p-dis-torch/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/slurm-tmp.1602950/ipykernel_362916/3726729332.py", line 20, in consumer_encode
  File "/scratch/users/k21190024/envs/conda/p-dis-torch/lib/python3.8/site-packages/sentence_transformers/SentenceTransformer.py", line 195, in encode
    all_embeddings = torch.stack(all_embeddings)
RuntimeError: stack expects a non-empty TensorList


In [7]:
a = read_gzip_data("/users/k21190024/study/fact-check-transfer-learning/scratch/dumps/explore/level/2/transformers/fever-corpus-flatten/wiki_004.pkl.gz")
a.keys()



### Climate-FEVER

In [3]:
cf = read_gzip_data(datap.joinpath("climatefever", "climatefever.json.gz"))
cf_claims = {}
cf_evidences = {}
for i in cf:
    cf_claims[i["claim_id"]] = i["claim"]
    cf_evidences[i["claim_id"]] = []
    for j in i["evidences"]:
        cf_evidences[i["claim_id"]].append(j["evidence"])
q_writer.put((
    model_encodep.parent.joinpath("climatefever-claims-flat.pkl.gz")
    , cf_claims
    , model_encodep.joinpath("climatefever-claims-embedding.pkl.gz")
))
q_writer.put((
    model_encodep.parent.joinpath("climatefever-evidences-flat-sentence.pkl.gz")
    , cf_evidences
    , model_encodep.joinpath("climatefever-evidences-embedding.pkl.gz")
))

### SciFact

In [36]:
sf_evidence = {k: " ".join(v["abstract"]) for k, v in read_gzip_data(datap.joinpath("scifact", "corpus.json.gz")).items()}
q_writer.put((
    model_encodep.parent.joinpath("scifact-evidences-flat-document.pkl.gz")
    , sf_evidence
    , model_encodep.joinpath("scifact-evidences-embedding.pkl.gz")
))
sf_claims = {i["id"]: i["claim"] for i in read_gzip_data(datap.joinpath("scifact", "fullscifact.json.gz"))}
q_writer.put((
    model_encodep.parent.joinpath("scifact-claims-flat.pkl.gz")
    , sf_claims
    , model_encodep.joinpath("scifact-claims-embedding.pkl.gz")
))

### Fever Corpus Async

In [84]:
foutp = model_encodep.joinpath(f"fever-evidences-{finp.name.split('.')[0]}.pkl.gz")
foutp

PosixPath('/users/k21190024/study/fact-check-transfer-learning/scratch/dumps/explore/level/2/transformers/encoded/fever-evidences-wiki_012.pkl.gz')

## Terminate

In [4]:
q_writer.put((None, None, None))
q_encode.put((None, None))

t_writer.join()
t_encode.join()

/users/k21190024/study/fact-check-transfer-learning/scratch/dumps/explore/level/2/transformers/encoded/climatefever-claims-embedding.pkl.gz
/users/k21190024/study/fact-check-transfer-learning/scratch/dumps/explore/level/2/transformers/encoded/climatefever-evidences-embedding.pkl.gz


# Playground