Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TraceConfig signals not triggered #3764

Open
matej-fr opened this issue May 13, 2019 · 2 comments
Open

TraceConfig signals not triggered #3764

matej-fr opened this issue May 13, 2019 · 2 comments

Comments

@matej-fr
Copy link

matej-fr commented May 13, 2019

Long story short

I am using TraceConfig signals to synchronise POST requests. They have to be send out in particular order, and since the process is timing critical, I am waiting for event (Event.wait) in on_connection_create_end. The event is released/triggered in on_request_chunk_sent with Event.set.

The code is like this:

    for i in range(2):

        print(f'Run {i}')
        print(trace_config.on_connection_create_end[0])

        b_q = []
        tr_events = []
        for k in range(5):
            b_q.append(make_post_req(session, id=k))
            tr_events.append(asyncio.Event())

        await asyncio.gather(*b_q)
        await asyncio.sleep(5)

I am running two batches of 5 requests.
make_post_req creates POST request using the session object.
tr_events is a global variable to track asyncio.Event objects between requests. I.e. request 1 has to wait for release of request 0.

The idea is to create almost 5 requests in parallel; but the later request is only released after the previous request has already sent out the payload.

Expected behaviour

I would expect that first and the second batch are processed equally which is not the case.

Actual behaviour

The first batch (the firs run of loop i) is processed correctly. All signals are triggered.
On the second batch only signals on_request_start, on_request_chunk_sent and on_request_end are triggered. Since the on_connection_create_end is not triggered my locks are not awaited.

print(trace_config.on_connection_create_end[0]) is there to verify whether TraceConfig object holds a valid reference to the function.

I have also removed the i loop and made two sequential batches of 5 requests. The same behaviour.

I am tracking the following signals:
trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) trace_config.on_connection_create_start.append(on_connection_create_start) trace_config.on_dns_cache_hit.append(on_dns_cache_hit) trace_config.on_dns_resolvehost_end.append(on_dns_resolvehost_end) trace_config.on_connection_create_end.append(on_connection_create_end) trace_config.on_request_chunk_sent.append(on_request_chunk_sent)

How come that the signals are not triggered in the second run?

Your environment

client

@matej-fr
Copy link
Author

This is full code that recreates the above described behaviour.

import aiohttp
import asyncio

api_end_point='https://ptsv2.com'
post_endpoint='/t/veliko_sranje/post'


async def post_req(session, event_last, event_this, id=0):


    payload = {

        'id': id
    }

    url=api_end_point + post_endpoint

    async with session.request('POST', url, data=payload, trace_request_ctx={'id':id, 'e_last':event_last, 'e_this': event_this}) as resp:
        status = resp.status
        r = await resp.text()

    return


async def on_request_start(session, trace_config_ctx, params):


    trace_config_ctx.mk_url=params.url
    trace_config_ctx.mk_id=trace_config_ctx.trace_request_ctx['id']
    print("Starting request", params.url, trace_config_ctx.mk_id)


async def on_request_end(session, trace_config_ctx, params):
    print("Ending request", params.url)
    return

async def on_connection_create_start(session, trace_config_ctx, params):
    print("On connection create start", trace_config_ctx.mk_url)
    return

async def on_dns_cache_hit(session, trace_config_ctx, params):
    print("On DNS cache hit", params, trace_config_ctx.mk_url)
    return

async def on_dns_resolvehost_end(session, trace_config_ctx, params):
    print("On DNS resolve end", params, trace_config_ctx.mk_url)
    return

async def on_connection_create_end(session, trace_config_ctx, params):

    print("On connection create end", trace_config_ctx.mk_url, trace_config_ctx.mk_id)
    if trace_config_ctx.trace_request_ctx['e_last'] is not None:
        print("Waiting for lock release", trace_config_ctx.mk_id-1)
        await trace_config_ctx.trace_request_ctx['e_last'].wait()
        print("End waiting for lock release")


async def on_request_chunk_sent(session, trace_config_ctx, params):


    print("Chunk sent", params, trace_config_ctx.mk_url)

    # this sleep is not a bug; this is a delay to assure network synchronisation
    await asyncio.sleep(0.2)
    print("Releasing lock", trace_config_ctx.mk_id)

    trace_config_ctx.trace_request_ctx['e_this'].set()


async def main(loop):

    conn = aiohttp.TCPConnector(limit_per_host=100, ssl=False)

    trace_config = aiohttp.TraceConfig()

    trace_config.on_request_start.append(on_request_start)
    trace_config.on_request_end.append(on_request_end)
    trace_config.on_connection_create_start.append(on_connection_create_start)
    trace_config.on_dns_cache_hit.append(on_dns_cache_hit)
    trace_config.on_dns_resolvehost_end.append(on_dns_resolvehost_end)
    trace_config.on_connection_create_end.append(on_connection_create_end)
    trace_config.on_request_chunk_sent.append(on_request_chunk_sent)

    session = aiohttp.ClientSession(connector=conn, trace_configs=[trace_config])

    for i in range(2):

        print(f'Run {i}')
        b_q = []
        event_last=None
        for k in range(5):
            id=k+i*10
            event_this=asyncio.Event()
            b_q.append(post_req(session, event_last, event_this, id=id))
            event_last = event_this

        await asyncio.gather(*b_q)

    await session.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_until_complete(asyncio.sleep(0))
loop.close()

@asvetlov
Copy link
Member

Briefly looking on your snippet I guess that you have the following situation:

  1. The first request has no open HTTP connection in the connector's pool, the connection is created
  2. Next request can reuse a free open connection to the same host (HTTP Keep-Alive feature).
    all on_connection_create_start, on_dns_cache_hit, on_dns_resolvehost_end, and
    on_connection_create_end are skipped but on_connection_reuseconn event is triggered.

Does it look real?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants