# Load in the Data, Make Pandas-able
The raw data for this project is an ASE Database that holds the results of the water cluster calculations taken from a recent paper by [Rakshit et al.](https://doi.org/10.1063/1.5128378).
We need to convert these ASE `Atoms` objects, which list the atomic coordinates and energy, into a form with the graph structure.
As in the [`parse-dataset-from-scratch`](./parse-dataset-from-scartch.ipynb) notebook, we run that conversion here but store the result in a MongoDB

In [1]:
from hydronet.db import HydroNetDB, HydroNetRecord
from multiprocessing import Pool
from pymongo.errors import BulkWriteError
from ase.db import connect
from tqdm import tqdm
import numpy as np
import zipfile
import os

Configuration settings

In [2]:
n_jobs = min(8, os.cpu_count())  # Number of processes to use

## Get the ASE database from the ZIP file
We are going to uncompress it temporarily

In [3]:
data_zip = zipfile.ZipFile(os.path.join('data', 'input', 'ALL_geoms.zip'))

In [4]:
%%time
data_path = os.path.join('temp.db', 'water_db', 'ALL_geoms_all_sorted.db')
if not os.path.isfile(data_path):
    path_check = data_zip.extract('water_db/ALL_geoms_all_sorted.db', 'temp.db')
    assert path_check == data_path
    print(f'Extracted data to {data_path}')

CPU times: user 147 µs, sys: 101 µs, total: 248 µs
Wall time: 168 µs


In [5]:
%%time
ase_db = connect(data_path)
total = ase_db.count()
print(f'Connected to database with {total} records')

Connected to database with 4464740 records
CPU times: user 300 ms, sys: 2.07 s, total: 2.37 s
Wall time: 1min 34s


## Write the whole dataset to disk
Pull records from the ASE DB, convert them to the Mongo-compatible format, and write them to disk. 

We have a few performance optimizations:
1. Read data from the ase database in chunks. Prevents reading the 10GB+ database into memory at one time
1. Parallelize converting from ase.Atoms to database records. This step requires detecting hydrogen bonds, which can be time-consuming for larger water clusters
1. Write records to disk as completed. Insertion order doesn't matter and this lets us keep the main thread as busy as possible

In [6]:
def pull_from_database(ase_db, chunk_size=128, total=None):
    """Iterate over a large database
    
    Queries only a small chunk size at a time to prevent loading the 
    whole database into memory. 
    
    Args:
        ase_db (Connection): Connection to an ASE database
        chunk_size (int): Number of entries to retrieve per query
        total (int): Total number of entries to retrieve
    """
    # Figure out how many iterations we need to make
    if total is None:
        total = ase_db.count()
    
    # Generate the dataset
    starts = np.arange(0, total, chunk_size, dtype=np.int32)
    
    # Iterate through the whole database
    for start in starts:
        for a in ase_db.select(
            selection=[('id','>=', str(start)), ('id', '<', str(start+chunk_size))]):
            yield a.toatoms()

In [7]:
mongo = HydroNetDB.from_connection_info(port=27445)
mongo.initialize_index()  # Sets up unique indices

'coord_hash_1'

In [8]:
chunksize=1024
to_insert = []
with Pool(n_jobs) as pool:
    for record in tqdm(pool.imap_unordered(HydroNetRecord.from_atoms, pull_from_database(ase_db), chunksize=chunksize), total=total):
        # Add it to the list of things to upload
        record.source = 'wdbase'  # Mark that these came from WDBase
        to_insert.append(record.dict())
        
        # Use a bulk insert of a certain chunk size
        if len(to_insert) >= chunksize:
            coord_hashes = [x['coord_hash'] for x in to_insert]
            mongo.collection.delete_many({'coord_hash': {'$in': coord_hashes}})
            try:
                mongo.collection.insert_many(to_insert, ordered=False)
            except BulkWriteError:
                pass
            to_insert.clear()

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4464740/4464740 [10:21:39<00:00, 119.70it/s]


In [9]:
%%time
mongo_size = mongo.collection.count_documents({})
print(f'Stored {mongo_size} unique clusters')

Stored 4464542 unique clusters
CPU times: user 297 ms, sys: 211 ms, total: 508 ms
Wall time: 13min 11s


## Illustrate the features of the database
The database lets us retrieve records and effiicently turn them into different formats, and also has a utility function for adding `ase.Atoms` objects

We'll start off by demonstrating how to find the lowest-energy cluster with 10 atoms

In [10]:
%%time
cursor = mongo.collection.find({'n_waters': 10}).sort('energy').limit(1)  # Defines a query over the database
record = next(cursor)  # Returns the next result in the database query

CPU times: user 74.2 ms, sys: 54.2 ms, total: 128 ms
Wall time: 3min 16s


The data here stores some key data (e.g., the energy) as standard data types (e.g., energy as a float)

In [11]:
record['energy']

-94.6706543

It also has some useful data, like a random position in the database used for shuffling and the source from the record

In [12]:
record['source']  # Where the data came from

'wdbase'

In [13]:
record['create_date']  # When it was added

datetime.datetime(2022, 3, 12, 2, 14, 25, 650000)

In [14]:
record['position']  # Its order in the dataset (used to produce data sorted in a random order)

0.6870445144874042

Larger data, such as coordiantes, are stored in binary form as pickled numpy arrays

In [15]:
record['coords_'][:50]

b'\x80\x02cnumpy.core.multiarray\n_reconstruct\nq\x00cnumpy\nnda'

We provide a utility class, HydroNetRecord, that allows you to quickly transform this data into desirable formats

In [16]:
record = HydroNetRecord.parse_obj(record)
record

n_waters=10 energy=-94.6706543

For example, you can access the coordinates as a numpy array

In [17]:
record.coords[0, :]

array([ 7.58198261, -0.66332477,  5.48388386])

or generate a graph or dictionary represntation of the dataset

In [18]:
record.atomic_nx

<networkx.classes.graph.Graph at 0x7fea0711a6d0>

In [19]:
record.coarse_dict

{'n_waters': 10,
 'n_atoms': 10,
 'n_bonds': 30,
 'atom': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 'bond': array([0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0,
        0, 1, 1, 0, 1, 1, 0, 0]),
 'connectivity': array([[0, 2],
        [0, 5],
        [0, 7],
        [1, 2],
        [1, 5],
        [1, 8],
        [2, 0],
        [2, 1],
        [2, 9],
        [3, 4],
        [3, 6],
        [3, 8],
        [4, 3],
        [4, 5],
        [4, 7],
        [5, 0],
        [5, 1],
        [5, 4],
        [6, 3],
        [6, 7],
        [6, 9],
        [7, 0],
        [7, 4],
        [7, 6],
        [8, 1],
        [8, 3],
        [8, 9],
        [9, 2],
        [9, 6],
        [9, 8]]),
 'energy': -94.6706543}

There are some utility operations that let you automatically return MongoDB documents as these records

In [20]:
%%time
cursor = mongo.collection.find({'n_waters': 8}).sort('energy').limit(5)
records = list(mongo.iterate_as_records(cursor))  # `list()` iterates over all items in cursor
records

CPU times: user 78.1 ms, sys: 47.8 ms, total: 126 ms
Wall time: 3min 22s


[n_waters=8 energy=-73.3260498,
 n_waters=8 energy=-73.2936554,
 n_waters=8 energy=-70.9184799,
 n_waters=8 energy=-70.8874054,
 n_waters=8 energy=-70.103035]

Note how it returns structures with different energies. We have a mechanism in our database that ensures only one document per specific cluster (defined by its coordinates), which cuts down on duplicates

To demonstrate, let's re-add cluster number one

In [21]:
mongo.add_cluster(records[0].atoms)  # Returns "false" as the record has already exists

False

In [22]:
%%time
cursor = mongo.collection.find({'n_waters': 8}).sort('energy').limit(5)
records = list(mongo.iterate_as_records(cursor))  # `list()` iterates over all items in cursor
records

CPU times: user 82 ms, sys: 45.9 ms, total: 128 ms
Wall time: 3min 23s


[n_waters=8 energy=-73.3260498,
 n_waters=8 energy=-73.2936554,
 n_waters=8 energy=-70.9184799,
 n_waters=8 energy=-70.8874054,
 n_waters=8 energy=-70.103035]

Note how the top cluster is not duplicated

We also have some functionality around saving the database to TFrecords that are described further in the class docstrings.