# Concurrent Database Connections
DocTable makes it easy to establish concurrent database connections from different processes. DocTable objects can be copied as-is from one process to another, except that you must call `.reopen_engine()` to initialize in process thread. This removes now stale database connections (which are _not_ meant to traverse processes) from the engine connection pool.

You may also want to use a large timeout using the timeout argument of the doctable constructor (provided in seconds).

In [1]:
import sqlalchemy
from multiprocessing import Process
import os
import random
import string
import dataclasses
import time
import sys
sys.path.append('..')
import doctable

In [2]:
import datetime
@dataclasses.dataclass
class SimpleRow(doctable.DocTableRow):
    id: int = doctable.IDCol()
    updated: datetime.datetime = doctable.AddedCol()
    process: str = doctable.Col()
    number: int = doctable.Col()

tmp = doctable.TempFolder('exdb')
db = doctable.DocTable(schema=SimpleRow, target=tmp.joinpath('tmp_concurrent.db'), new_db=True, timeout=60)

In [3]:
def thread_func(numbers, db):
    process_id = ''.join(random.choices(string.ascii_uppercase, k=2))
    print(f'starting process {process_id}')
    db.reopen_engine() # create all new connections
    for num in numbers:
        db.insert({'process': process_id, 'number': num})
        time.sleep(0.01)

numbers = list(range(100)) # these numbers are to be inserted into the database
        
db.delete()
with doctable.Distribute(5) as d:
    d.map_chunk(thread_func, numbers, db)
db.head(10)

starting process VK
starting process UG
starting process KQ
starting process YK
starting process KB


Unnamed: 0,id,updated,process,number
0,1,2021-06-02 14:43:18.459232,VK,40
1,2,2021-06-02 14:43:18.466155,UG,60
2,3,2021-06-02 14:43:18.469247,KQ,80
3,4,2021-06-02 14:43:18.472688,VK,41
4,5,2021-06-02 14:43:18.479446,UG,61
5,6,2021-06-02 14:43:18.482274,KQ,81
6,7,2021-06-02 14:43:18.484088,KB,0
7,8,2021-06-02 14:43:18.485580,VK,42
8,9,2021-06-02 14:43:18.492548,UG,62
9,10,2021-06-02 14:43:18.495039,KQ,82


## Using `QueueInserter` to queue rows for insertion
Because processes must wait on one another for insertions, it may be desirable to insert databases in bulk. To avoid adding additional queueing logic to your code, you can use the builtin QueueInserter object, retreived from a DocTable using `get_queueinserter()`. Set the `chunk_size` param to determine the size of each insertion.

In [4]:
inserter = db.get_queueinserter(chunk_size=2, verbose=True)

db.delete()
for i in range(5):
    inserter.insert({'number': i})

inserter.dump() # dump remaining rows into db (happens automatically upon garbage collection)
db.head(10)

Inserting 2 records.
Inserting 2 records.
Inserting 1 records.


Unnamed: 0,id,updated,process,number
0,1,2021-06-02 14:43:18.899379,,0
1,2,2021-06-02 14:43:18.899387,,1
2,3,2021-06-02 14:43:18.902904,,2
3,4,2021-06-02 14:43:18.902913,,3
4,5,2021-06-02 14:43:18.907044,,4


## `QueueInserter` example in threads
We can also create `QueueInserter` objects in threads and observe that they are inserted simulataneously.

In [7]:
def thread_func(numbers, db):
    process_id = ''.join(random.choices(string.ascii_uppercase, k=2))
    print(f'starting process {process_id}')
    db.reopen_engine() # create all new connections
    inserter = db.get_queueinserter(chunk_size=2)
    for num in numbers:
        inserter.insert({'process': process_id, 'number': num})
        time.sleep(0.01)

numbers = list(range(100)) # these numbers are to be inserted into the database
        
db.delete()
with doctable.Distribute(3) as d:
    d.map_chunk(thread_func, numbers, db)
db.head(10)

starting process QR
starting process JV
starting process BY


Unnamed: 0,id,updated,process,number
0,1,2021-06-02 14:43:52.683512,QR,0
1,2,2021-06-02 14:43:52.683521,QR,1
2,3,2021-06-02 14:43:52.688945,JV,34
3,4,2021-06-02 14:43:52.688951,JV,35
4,5,2021-06-02 14:43:52.707044,QR,2
5,6,2021-06-02 14:43:52.707050,QR,3
6,7,2021-06-02 14:43:52.712354,JV,36
7,8,2021-06-02 14:43:52.712360,JV,37
8,9,2021-06-02 14:43:52.726484,BY,68
9,10,2021-06-02 14:43:52.726492,BY,69
