# COSMOS Detection pipeline

This walkthrough shows how to perform basic document region detection over PDF documents using COSMOS. We'll use a trained model to classify regions on pages into tables, equations, etc

In [1]:
from dask.distributed import Client
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from ingest.utils.pdf_helpers import prepare_pdf_objs
from ingest.ingest import pdf_to_images
from ingest.process_page import process_page
from ingest.process.detection.src.infer import get_model
from ingest.detect import detect
from ingest.detect_setup import DetectPlugin
from tqdm import tqdm

## Setting up the distributed backend

Cosmos uses Dask's lower level api, [Dask Distributed](https://distributed.dask.org/en/latest/) to handle its processing load. In this walkthrough, we'll setup a single node for processing, but this can be naturally extended to however many workers you want. Now, you can visualize your processing by clicking on the dashboard link below.

In [2]:
client = Client(serializers=['msgpack', 'dask'], deserializers=['msgpack', 'dask']) # We'll need msgpack as a serializer to fallback on
client

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:asyncio:Using selector: KqueueSelector


0,1
Client  Scheduler: tcp://127.0.0.1:58661  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 17.18 GB


## Preparing the PDF files

Next, we can prepare the PDF files. We'll use the documents in the example_docs directory, but you can point to your own documents to try it out. We also create a dataset id, which we will use to access our results later.

In [3]:
documents_directory = './example_docs'
dataset_id = 'example'
pdfs = client.submit(prepare_pdf_objs, documents_directory, dataset_id)
pdfs = pdfs.result()

## Extract the images from the pdfs

We're going to not use subtasks. As a result we're going to need to call result() to synchronize and flatten lists.

In [4]:
client.scatter(pdfs) # PDFs have lots of bytes, good to scatter them first
pdf_images = [client.submit(pdf_to_images, pdf) for pdf in pdfs]
pdf_images = [p.result() for p in pdf_images]
images = [i for j in pdf_images for i in j]

  ({'pdf': 'JVBERi0xLjUNCiW1tbW1DQoxIDAgb2JqDQo8PC9U ... 6.00199.pdf'},)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


## Propose regions on the page

In [5]:
proposals = client.map(process_page, images)

## Classify the proposals

We load the model from pretrained weights, and supply the model config. We'll run the model on CPU, but if you have a GPU available, you can load the model to your GPU by setting the device string. To enable parallelism properly, we will load a model onto each of the workers by adding a model plugin.

In [6]:
model_config = '../cosmos/ingestion/ingest/process/configs/model_config.yaml'
weights_pth = '../cosmos/weights/model_weights.pth'
device_str = 'cpu'
plugin = DetectPlugin(model_config, weights_pth, device_str, keep_bytes=False)
client.register_worker_plugin(plugin)

  config = yaml.load(fh)


Built backbone resnet50
Building downstream components via shape testing
done shape testing, building, attention mechanisms
built multi head attention
8, 8, 2048
super called
going to build a 131072 by 1024 matrix of weights
built embeddings
done




{'tcp://127.0.0.1:58667': {'status': 'OK'},
 'tcp://127.0.0.1:58668': {'status': 'OK'},
 'tcp://127.0.0.1:58670': {'status': 'OK'},
 'tcp://127.0.0.1:58671': {'status': 'OK'}}

Now detect

In [7]:
detected_objs = client.map(detect, proposals)
detected_objs[0].result()

And that is the minimal detection pipeline to detect tables, figures, etc. We provide additional ways to improve the model, such as an XGBoost post processing step and a rule-based postprocessing step, but that is beyond the scope of this tutorial.