In [1]:
from Pipeline import Pipeline
import Adapters
import Processors
import Sinks

05/20/2021 13:46:11 - INFO - faiss.loader -   Loading faiss with AVX2 support.
05/20/2021 13:46:11 - INFO - faiss.loader -   Loading faiss.
  return torch._C._cuda_getDeviceCount() > 0
05/20/2021 13:46:11 - INFO - farm.modeling.prediction_head -   Better speed can be achieved with apex installed from https://www.github.com/nvidia/apex .


In [2]:
from haystack.document_store.elasticsearch import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(host="localhost", username="", password="", index="document")

05/20/2021 13:46:12 - INFO - elasticsearch -   HEAD http://localhost:9200/ [status:200 request:0.006s]
05/20/2021 13:46:13 - INFO - elasticsearch -   PUT http://localhost:9200/document [status:200 request:0.238s]
05/20/2021 13:46:13 - INFO - elasticsearch -   PUT http://localhost:9200/label [status:200 request:0.276s]


### Pipeline Definition

In [3]:
adapter = Adapters.UnarxiveAdapter("../../data/unarXive-sample")

In [4]:
pipeline = Pipeline(adapter, batch_size=20)

In [5]:
#MetadataArxiveEnricher
from arxive_metadata.rocksDB import RocksDBAdapter
db = RocksDBAdapter("arXive_metadata", "http://localhost:8089")
pipeline.add_processor(Processors.MetadataArxiveEnricher("arixive-id",db))

In [6]:
#FilterOnMetadataValue
cs_values = [
    "cs.AI", 
    "cs.AR", 
    "cs.CC", 
    "cs.CE", 
    "cs.CG", 
    "cs.CL", 
    "cs.CR",
    "cs.CV",
    "cs.CY",
    "cs.DB",
    "cs.DC",
    "cs.DL",
    "cs.DM",
    "cs.DS",
    "cs.ET",
    "cs.FL",
    "cs.GL",
    "cs.GR",
    "cs.GT",
    "cs.HC",
    "cs.IR",
    "cs.LG",
    "cs.LO",
    "cs.MA",
    "cs.MM",
    "cs.MS",
    "cs.NA",
    "cs.NE",
    "cs.NI",
    "cs.OH",
    "cs.OS",
    "cs.PF",
    "cs.PL",
    "cs.RO",
    "cs.SC",
    "cs.SD",
    "cs.SE",
    "cs.SI",
    "cs.SY"
]
pipeline.add_processor(Processors.FilterOnMetadataValue("categories", cs_values))

In [7]:
#MetadataFieldDiscarder
fields_to_discard = ["update_date", "versions", "comments", "report-no", "license", "id", "abstract", "categories", "submitter", "journal-ref", "authors_parsed"]
pipeline.add_processor(Processors.MetadataFieldDiscarder(fields_to_discard))

In [8]:
#TextKeywordCut
pipeline.add_processor(Processors.TextKeywordCut("introduction"))

In [9]:
#TextReplaceFilter
pipeline.add_processor(Processors.TextReplaceFilter(r"\{{(.*?)\}}", ""))

In [10]:
#HaystackPreProcessor
from haystack.preprocessor import PreProcessor
pre_processor = PreProcessor(
    clean_empty_lines=True,
    clean_whitespace=True,
    clean_header_footer=True,
    split_by="word",
    split_length=200,
    split_respect_sentence_boundary=False,
    split_overlap=0)
pipeline.add_processor(Processors.HaystackPreProcessor(pre_processor))

In [11]:
#TextfileSink
pipeline.add_sink(Sinks.TextfileSink("/home/daniel/storage/KD/testoutput", ["arixive-id", "_split_id"]))

In [12]:
#ElasticsearchSink
pipeline.add_sink(Sinks.ElasticsearchSink(document_store))

In [13]:
print(pipeline)

(UnarxiveAdapter) ---> *MetadataArxiveEnricher* ---> *FilterOnMetadataValue* ---> *MetadataFieldDiscarder* ---> *TextKeywordCut* ---> *TextReplaceFilter* ---> *HaystackPreProcessor*
===> |_TextfileSink_|
===> |_ElasticsearchSink_|


### Use Pipeline to process documents

In [16]:
pipeline.process()

Processing batches: [####################################........................] 6/10

05/20/2021 13:46:57 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:0.854s]


Processing batches: [############################################################] 10/10


### Pipeline to update entries in Elasticsearch

In [28]:
e_adapt = Adapters.ElasticsearchAdapter(document_store, batch_size=10)

05/20/2021 13:57:51 - INFO - elasticsearch -   POST http://localhost:9200/document/_count [status:200 request:0.085s]


In [29]:
update_pipeline = Pipeline(e_adapt, batch_size=20)

In [30]:
#ElasticsearchSink (put items back into Elasticsearch)
update_document_store = ElasticsearchDocumentStore(update_existing_documents=True, host="localhost", username="", password="", index="document")
update_pipeline.add_sink(Sinks.ElasticsearchSink(update_document_store))

05/20/2021 13:57:54 - INFO - elasticsearch -   HEAD http://localhost:9200/ [status:200 request:0.005s]
05/20/2021 13:57:54 - INFO - elasticsearch -   HEAD http://localhost:9200/document [status:200 request:0.007s]
05/20/2021 13:57:54 - INFO - elasticsearch -   GET http://localhost:9200/document [status:200 request:0.003s]
05/20/2021 13:57:54 - INFO - elasticsearch -   PUT http://localhost:9200/document/_mapping [status:200 request:0.016s]
05/20/2021 13:57:54 - INFO - elasticsearch -   HEAD http://localhost:9200/label [status:200 request:0.002s]


In [31]:
print(update_pipeline)

(ElasticsearchAdapter)
===> |_ElasticsearchSink_|


In [32]:
update_pipeline.process()

Processing batches: [............................................................] 0/3

05/20/2021 13:58:00 - INFO - elasticsearch -   POST http://localhost:9200/document/_search?scroll=1d&size=10 [status:200 request:0.031s]
05/20/2021 13:58:00 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.012s]
05/20/2021 13:58:00 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:0.538s]


Processing batches: [####################........................................] 1/3

05/20/2021 13:58:00 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.005s]
05/20/2021 13:58:00 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.006s]
05/20/2021 13:58:01 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:1.003s]


Processing batches: [########################################....................] 2/3

05/20/2021 13:58:01 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.005s]
05/20/2021 13:58:01 - INFO - elasticsearch -   POST http://localhost:9200/_search/scroll [status:200 request:0.003s]
05/20/2021 13:58:01 - INFO - elasticsearch -   DELETE http://localhost:9200/_search/scroll [status:200 request:0.004s]
05/20/2021 13:58:02 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:0.997s]


Processing batches: [############################################################] 3/3
