Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running queries asynchronously #453

Closed
g3blv opened this issue Dec 25, 2020 · 3 comments
Closed

Running queries asynchronously #453

g3blv opened this issue Dec 25, 2020 · 3 comments
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: question Request for information or clarification. Not an issue.

Comments

@g3blv
Copy link

g3blv commented Dec 25, 2020

Environment details

  • OS type and version: Linux/Fedora 32
  • Python version: 3.8.5
    • pip version: 20.3.3
  • google-cloud-bigquery version: 2.2.0

Steps to reproduce

I'm trying to implement something similar to https://github.com/googleapis/python-bigquery/issues/18#issuecomment-514800953
. I have two queries where I want to have the result as two dataframes and I want the query and to_pandas() to run asynchronously.

The first step, events (query and to_pandas()) takes the longest time and I would have expected the second step, orders to start before the first query to_pandas() is done. But to me it looks like the first step runs through and then the second step starts.

Code example

queries = [query_events, query_orders] 

awaiting_jobs = set()


def callback(future):
    awaiting_jobs.discard(future.job_id)


start_pro = time.perf_counter()

for i, query in enumerate(queries):
    start_query = time.perf_counter()
    print(f'Step {i + 1} starting')
    job = bqclient.query(query)
    job.to_arrow(bqstorage_client=bqstorageclient).to_pandas()
    print(f'Step {i + 1} ending')
    end_query = time.perf_counter()
    print(f'Time for query and to_pandas(): {end_query - start_query:0.4f}')
    awaiting_jobs.add(job.job_id)
    job.add_done_callback(callback)

end_pro = time.perf_counter()
print(f"Total Download and Dataframe time Process: {end_pro - start_pro:0.4f} seconds")

Output

Step 1 starting
Step 1 ending
Time for query and to_pandas(): 5.7725
Step 2 starting
Step 2 ending
Time for query and to_pandas(): 2.3168
Total Download and Dataframe time Process: 8.0894 seconds
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label Dec 25, 2020
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Dec 26, 2020
@meredithslota meredithslota added type: question Request for information or clarification. Not an issue. and removed triage me I really want to be triaged. labels Dec 28, 2020
@HemangChothani
Copy link
Contributor

The main blocking is on the result() or (to_arrow) method in the example given above, so here are two examples that fetch the data in a separate thread.

from concurrent.futures import ThreadPoolExecutor, as_completed

from google.cloud import bigquery

client = bigquery.Client()
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result().to_arrow()))

OR Like:

def get_data(client, query): 
    job = client.query(query) 
    print ("in get data") 
    data = job.to_arrow() 
    return data 
  
 loop = asyncio.get_running_loop() 
  
 data1 = await loop.run_in_executor(None, get_data, client, query1) 
 data2 = await loop.run_in_executor(None, get_data, client, query2) 
 print (data1) 
 print(data2) 

OR 

async def sample(query): 
    loop = asyncio.get_running_loop() 
    print ("query process started") 
    data = await loop.run_in_executor(None, get_data, client, query) 
    print (data) 
          
async def parallel(): 
    await asyncio.gather(sample(query1), sample(query2)) 
    print ("all done") 
 
asyncio.run(parallel())

@plamut
Copy link
Contributor

plamut commented Jan 7, 2021

Thought - should we add this recipe as a new sample to the docs?

@tswast
Copy link
Contributor

tswast commented Jan 22, 2021

Thought - should we add this recipe as a new sample to the docs?

I'm not sure that we need to be telling people how to run code in multiple threads, as that isn't BigQuery-specific.

I'll close this issue as a duplicate of #18 and will start on some design work to see how we can possibly support an async client without doubling our manual workload.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

6 participants