## PoreMongo


#### Requirements
---

PoreMongo is written for Python > 3.6. For now:

```bash
git clone https://github.com/esteinig/poremongo
cd poremongo
pip install .
```

#### Configuration
---

The PoreMongo configuration JSON file (default: `poremongo.json`) helps to keep some basic and password sensitive parameters under user access only (make sure to change file permissions) and includes one mandatory entry - the URI to connect to MongoDB - such that:

```json
{
  "uri": "mongodb://<username>:<pwd>@<server>:<port>/poremongo-nb"
}
```

You can also add another nested entry to this in order to specify the SSH configuration to use, if you want to access files indexed on a remote server, such as your high-performance computer where the data is in storage. Currently this is only available for a single remote storage location. The SSH connection has to be explicitly established in the `PoreMongo` class, but more on that later. An example would be:

```json
{
  "uri": "mongodb://<username>:<pwd>@<server>:<port>/poremongo-nb",
  "ssh": {
    "user": "<user>",
    "password": "<ssh_pwd>",
    "server": "<server>",
    "port": "<port>"
  }
}
```

#### PoreMongo API
---

The next few sections walk through an object oriented application of PoreMongo in your Python code. Let's see how we can connect to the database and index Fast5 files from multiple sequencing runs, populating the database for the other operations like sampling and extracting signal level data.

You can use a config dictionary or the JSON config file.

```python
config = {
    "uri": "mongodb://<username>:<pwd>@<server>:<port>/poremongo-nb"
}
```

In [None]:
import poremongo
import shutil
import json
import os

from pprint import pprint
from poremongo.models import Fast5

# Using poremongo.nb.json configuration file
pongo = poremongo.PoreMongo(config=config, verbose=False) # silent

I usually explicitly connect and disconnect later from the database for clarity:

In [None]:
pongo.connect()  # pongo.disconnect()

#### Indexing and tagging Fast5 files in storage
---

We are connected! Excellent. Now the first step is to index some data paths on your local system. For the purpose of this demonstrations we will use the absolute paths for the test files as `tests/data/human_1` and `tests/data/tb_1` to write a demonstration function that indexes (without multiprocessing) and then tags the files according to organism (n = 10 for Human, n = 10 for TB).

In [None]:
def index_and_tag():
    """Wrapper for indexing and tagging test files in tests/data."""
    
    path_1 = os.path.abspath("tests/data/human_1")
    path_2 = os.path.abspath("tests/data/tb_1")
    
    pongo.index(index_path=path_1, recursive=True, insert=True, batch_size=5, ncpu=1, reconnect=True)
    pongo.tag(path_query=path_1, tags=("Human", "R9.4", "Notebook"), recursive=True)
    
    pongo.index(index_path=path_2, recursive=True, insert=True, batch_size=5, ncpu=1, reconnect=True)
    pongo.tag(path_query=path_2, tags=("TB", "R9.4", "Notebook"), recursive=True)

You can also assign tags for pass and fail directories, since a recursive path_query checks for: `path_query in fast5.path` - the following assigns the tag "pass" to all documents in the database that contain "/pass/" in their file path. On Windows, the query would be "\\\pass\\\".

In [None]:
pongo.tag(path_query="/pass/", tags=("pass"), recursive=True)

#### Explanation

#### Recursive search and insert to Database
---

Inserts into the database are always performed in batches of `batch_size` Fast5.

There is two important parameters: a boolean for `recursive` search through the directories and subdirectories. Recursive searches are a bit slower. For many hundreds of thousands of files, you should anticipate several hours of indexing. 

`Insert` will speed up indexing by inserting file paths that are not present in the database already - it will break the indexing process if you are re-indexing! Models in the database have a unique file path, so that files are not indexed multiple times. 

If you want to re-index the current file paths for whatever reason, the only way at the moment is to manually delete them by dropping the collection, for instance in Robomongo. There should be a fix for that soon - the reason here is that for some reason the indexing cannot be done through batch upserts in `PyMongo`, which would otherwise simply update the path of the Fast5.

#### Multiprocessing
---

Other important parameters include `ncpu` for multiprocessing inserts, which will first aggregate the files after scanning directories into batches and `insert` them into the database across multiple processors. **This significantly speeds up the indexing process and is essentially required for large file collections**.

You need to have `insert = True` for this to work. Multiprocess inserts connect to the database on multiple `PyMongo` clients, so if you do not want to reconnect manually to the main client initiated above (`connect`), also make sure to specify `reconnect = True`.

#### Tags
---

Tagging is conducted on indexed collections. Usually you would pass the same indexing path to the `path_query` parameter which then performs a query for the path in the database models (recursive tagging searches for the path contained `in` the file path).

Last, we display the tags present in the database (currently limited to the most frequent tags for clarity, n = 100).

In [None]:
# Comment this out when repeating the notebook, since you do not want
# to insert the same models into the database (throws NotUnique error)

index_and_tag() # If on Windows make sure to wrap multiprocessing into if "__name__" == "__main__"

In [None]:
pongo.display("tags")

#### Querying and sampling in PoreMongo
---

Alright, now that we have indexed and tagged our read collections we can query, sample and extract the Fast5 files from the MongoDB. Sampling uses aggregation pipelines and returns the Fast5 model objects defined via `Mongoengine`. These objects themselves have methods that may be useful to your operations, such as opening and returning the signal value arrays. 

Sampled objects can then be additionally filtered (in memory, so make sure it's not necessarily trillions of files) and passed along to other methods such as the `copy` method. This creates a copy of the Fast5 files from local storage into a directory of your choice. Copies can also be handled via SSH, if an entry for SSH configuration is present in the config dictionary / JSON (see below).

#### Queries
---

Let's first query the database for some Fast5 file objects. Queries can be raw `PyMongo` query dictionaries, or lists of queries that provide a super simple interface to the query process with `PyMongo`. Let's walk through some examples for `tag`, `path` and `name` queries using the low-level `query` method in `PoreMongo`:

In [None]:
# Define some queries:

tag_query_1 = "TB"                   # Single tag query
tag_query_2 = ["TB", "Human"]        # Multiple tag query
tag_query_3 = ["TB", "Notebook"]     # Multiple tag query

# Query the database, specify the logic with which to chain query items, if input is a list:

result_1 = pongo.query(tag_query=tag_query_1)
result_2 = pongo.query(tag_query=tag_query_2, query_logic="AND")
result_3 = pongo.query(tag_query=tag_query_2, query_logic="OR")
result_4 = pongo.query(tag_query=tag_query_3, query_logic="AND")
result_5 = pongo.query(tag_query=tag_query_3, query_logic="OR")


# Checking the number of query results:

print(f"""
Number of returned Fast5 objects:

    Query 1 = {len(result_1)} (TB files only)
    Query 2 = {len(result_2)}  (No files, "TB AND Human")
    Query 3 = {len(result_3)} (TB and Human files, "TB OR Human")
    Query 4 = {len(result_4)} (TB files, "TB AND Notebook")
    Query 5 = {len(result_5)} (TB and Human files, "TB OR Notebook")
""")


# Checking content of the returned results:
print("\nFast5 model objects from query 5:\n")

print(result_5)

# Checking tags of the last result:
print("Checking tags of query 5:\n")

for fast5 in result_5:
    print(fast5.tags)

# What can we do with Fast5 model objects?

fast5 = result_1[0] # single Fast5 model object

# Open file and extract the singal array, template strand only, 1D

signal_data = fast5.get_reads(template=True)

sliced_signal_data = fast5.get_reads(template=True, window_size=400, window_step=40)


print("\nRead (signal array) data extraction:")
print(f"""
Signal strand array, NumPy:             {signal_data} 
Signal template length:                 {len(signal_data[0])}   
Signal sliced, shape:                   {sliced_signal_data.shape}
""")

# Get model attributes as JSON:

fast5_attr = fast5.to_json()
print("Fast5 model attributes in JSON:\n")

obj = json.loads(fast5_attr)  
pprint(obj)

# Raw query on IMB14 start of file names of TB:

raw_query_1 = {"name": {"$regex": ".*IMB14.*"}}

raw_result_1 = pongo.query(raw_query=raw_query_1)

print(f"\nTB test files IMB14* -- {len(raw_result_1)}")


#### Random samples and file copy
---

Sample random models from the database and extract them by making copies (or symlinks) into a new directory.

In [None]:
# Pass all Fast5 model objects in database to tag sample aggregation pipeline
sample_1 = pongo.sample(Fast5.objects, tags=["Human"], limit=5, proportion=None, unique=False, exclude=None)  # Return 5

fast5_queried = result_1  # From query above, TB only

sample_2 = pongo.sample(fast5_queried, tags=["Human"], limit=5, proportion=None, unique=False, exclude=None)  # Return 0, no Human
sample_3 = pongo.sample(fast5_queried, tags=["TB"], limit=7, proportion=None, unique=False, exclude=None)  # Return 7, no Human

print(f"\nNumber of sample results: sample_1: {len(sample_1)}, sample_2: {len(sample_2)}, sample_3: {len(sample_3)}\n")

print("Tags from sample_1:\n")

for f5 in sample_1:
    print(f5.tags)
    
print("\n")

# It is also possible to pass pre-filtered / queried Fast5 objects to sample method

#### Query tag combinations + labels
---



In [None]:
# Function to sample Fast5 by tags for successive labels (0, 1, 2, ...), input is a nested list

labels_query_1 = [["TB", "R9.4"], ["Human"]] #  TB & R9.4 (label 0), Human (label 1)

def sample_by_label(sample_query, outdir=None, prop=None):
    
    """Generates a random sample of Fast5 objects for each tag entry in the sample_query.
    Sample randomly from database across tags and enumerate by label for each element in
    a nested input tag query; copy or make a CSV file of Fast5 paths in PoreMongo.
    """

    sampled = []
    labels = []
    # Sample for each tag list in sample query, assign label 0, 1, ...
    for label, tags in enumerate(sample_query):
        sample = pongo.sample(Fast5.objects, tags=tags, limit=5, proportion=prop)
        sampled += sample
        labels += [label for _ in sample]
    
    print("\n", sampled, "\n")
    print(labels, "\n")
    
    if outdir:
        pongo.copy(sampled, outdir=outdir, ncpu=1, iterate=False)
    else:
        pongo.to_csv(sampled, labels=labels, out_file="tmp/tmp.csv")
    
    # Object IDs of random sample
    ids = [str(fast5._id) for fast5 in sampled]
    
    pprint(ids)
    print("\n")
    
    return sampled, labels

sample_by_label(labels_query_1, outdir="tmp/random_1")
sample_by_label(labels_query_1, outdir="tmp/random_2")

shutil.rmtree("tmp")  # Clean

#### Proportional sampling
---


In [None]:
sampled_1, labels_1 = sample_by_label(labels_query_1, outdir="tmp/random_1", prop=None)

sampled_2, labels_2 = sample_by_label(labels_query_1, outdir="tmp/random_2", prop="equal")

shutil.rmtree("tmp")  # Clean

#### Run Simulation
---

First query a subset of the data (all Fast5 models with tag TB). Then group the subset into Fast5 runs with the same experiment start time. There is one run grouping all TB files. Use the run ids to perform a query to the database and obtain the fast5 models belonging to this run (same as the initial query). Schedule a run by sorting the models by read completion times and scaling for speedup of the simulation. The simulation copies reads in to the output directory with time ibntervals according to the intervals between the read completions times.

In [None]:
# Perform query for human data only to limit run groups:
fast5 = pongo.query(tag_query="TB")

# Extract run groups by start timestamp:
runs = pongo.group_runs(fast5)

print("\n")
pprint(runs)

print("\nExtracting Fast models for each run and simulating file copies with Taeper.\n")
# Query for Fast5 models from run group:
fast5 = pongo.query(raw_query={"_id": {"$in": runs[1488336230]["fast5"]}})

# Schedule a run of the based on sorted end times of reads in Fast5 files
# close the scheduler after timeout of 20 seconds, to complete process.
pongo.schedule_run(fast5, scale=10000, outdir="run_sim_1", timeout=20)   

In [None]:
shutil.rmtree("run_sim_1")  # Clean, comment out if running watchdog (see below)

pongo.disonnect()  # Don't forget to disconnect

#### Watchdog with callback or index to PoreMongo
---

In a separate notebook or terminal, run Pomoxis async watchdog on a directory. You can repeat the above cell after commenting the call to shutil.rmtree, in conjunction with the callback defined below to print file names as they are copied into the simulation directory:

*Do not run.*

```python
import os
import time

from poremongo import PoreMongo

pongo = PoreMongo()

def callback(fpath):
    
    print(f"{time.time()} file={os.path.basename(fpath)}")

watcher = pongo.watch(path="run_sim_1", callback=callback, index=False)

# Not implemented yet: index = True

# Close async observer with watcher.close()

```

#### SSH Transfer
---

*Do not run.*

```python
import shutil

from poremongo import PoreMongo
from poremongo.models import Fast5

# When creating instance of PoreMongo pass dict with SSH config:

config = {
  "uri": "mongodb://<username>:<pwd>@<server>:<port>/poremongo-nb",
  "ssh": {
    "user": "<user>",
    "password": "<ssh_pwd>",
    "server": "<server>",
    "port": "<port>"
  }
}

pongo = PoreMongo(config=config)

# Connect to DB
pongo.connect(ssh=True)

# Smaple from all Fast5 for Human tag
fast5_sample = pongo.sample(Fast5.objects, tags=["Human"], limit=5, proportion=None, unique=False, exclude=None)

# SCP copy file into temporary folder
for fast5 in fast5_sample:
    # Explicitly use SCP client from PoreMongo instance
    fast5.get(pongo.scp, out_dir="tmp")  # SCP copies file and modifies fast5.path to tmp/fast5.name
    fast5.remove()  # Works only after fast5.path modfied by fast5.get, deletes file tmp/fast5.name

pongo.disconnect(ssh=True)

shutil.rmtree("tmp")

```