Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Concurrent reading using multiprocessing #332

@hadim

Description

@hadim

The example from the doc https://cloud.google.com/spanner/docs/reads#read_data_in_parallel works well with multi-threading. That being said in Python I found the performance to be equivalent to without using multithreading (probably because of the GIL).

I tried to use multiprocessing instead but can't make it work and I am not even sure this is possible.

  • I first tried to replace the doc example ThreadPoolExecutor with ProcessPoolExecutor. It hangs forever so I guess the snapshot object can't be shared between multi processes.
  • I also tried to recreate the session but a partition is only valid within the same session according to the error: details = "Partitioned request was created for a different session."

Here is the code of the latter approach (recreating the session):

spanner_client = spanner.Client(project=project_id)
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

table = "xxxx"
columns = ("col1", "col2")


def process(batch):
    
    spanner_client = spanner.Client(project=project_id)
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)
    
    logger.info(f"Partition: {batch['partition'][:32]}")

    snapshot = database.batch_snapshot()

    row_ct = 0
    for row in tqdm(snapshot.process_read_batch(batch)):
        row_ct += 1
        
    snapshot.close()

    return time.time(), row_ct


snapshot = database.batch_snapshot()

keyset = spanner.KeySet(all_=True)
batches = snapshot.generate_read_batches(table=table, columns=columns, keyset=keyset)
batches = list(batches)

logger.info(f"{len(batches)} batches detected")

start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = [executor.submit(process, batch) for batch in batches]

    pbar = tqdm(concurrent.futures.as_completed(futures, timeout=3600), total=len(batches))
    for future in pbar:
        finish, row_ct = future.result()
        elapsed = finish - start
        print(f"Completed {row_ct} rows in {elapsed} seconds")

snapshot.close()

Is there is a way to dispatch batched results to multiple processes in Python for high-performance reading? I guess each partition hash should be able to live outside of a snapshot object on the Spanner side. I am not sure this is possible.

Metadata

Metadata

Assignees

Labels

api: spannerIssues related to the googleapis/python-spanner API.priority: p2Moderately-important priority. Fix may not be included in next release.type: questionRequest for information or clarification. Not an issue.

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions