Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dialog tracking to use tags & call_id #112

Merged
merged 7 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions aiosip/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class Application(MutableMapping):

def __init__(self, *,
loop=None,
dialog_factory=Dialog,
middleware=(),
defaults=None,
debug=False,
Expand Down Expand Up @@ -63,7 +62,6 @@ def __init__(self, *,
self._tasks = list()

self.dialplan = dialplan
self.dialog_factory = dialog_factory
self.loop = loop

@property
Expand Down Expand Up @@ -107,14 +105,16 @@ def __init__(self):
self.app = app
self.dialog = None

def _create_dialog(self):
def _create_dialog(self, dialog_factory=Dialog, **kwargs):
if not self.dialog:
self.dialog = peer._create_dialog(
method=msg.method,
from_details=Contact.from_header(msg.headers['To']),
to_details=Contact.from_header(msg.headers['From']),
call_id=call_id,
inbound=True
inbound=True,
dialog_factory=dialog_factory,
**kwargs
)
return self.dialog

Expand All @@ -133,18 +133,28 @@ async def prepare(self, status_code, *args, **kwargs):

async def _dispatch(self, protocol, msg, addr):
call_id = msg.headers['Call-ID']
dialog = self._dialogs.get(frozenset((msg.to_details.details,
msg.from_details.details,
call_id)))
dialog = None

if dialog:
# First incoming request of dialogs do not yet have a tag in to headers
if 'tag' in msg.to_details['params']:
dialog = self._dialogs.get(frozenset((msg.to_details['params']['tag'],
msg.from_details['params']['tag'],
call_id)))

# First response of dialogs have a tag in the to header but the dialog is not
# yet aware of it. Try to match only with the from header tag
if dialog is None:
dialog = self._dialogs.get(frozenset((None, msg.from_details['params']['tag'], call_id)))

if dialog is not None:
await dialog.receive_message(msg)
return

# If we got an ACK, but nowhere to deliver it, drop it. If we
# got a response without an associated message (likely a stale
# retransmission, drop it)
if isinstance(msg, Response) or msg.method == 'ACK':
LOG.debug('Discarding incoming message: %s', msg)
return

await self._run_dialplan(protocol, msg)
Expand Down
50 changes: 26 additions & 24 deletions aiosip/dialog.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import asyncio
import enum
import asyncio
import logging

from collections import defaultdict
from multidict import CIMultiDict
from collections import defaultdict
from async_timeout import timeout as Timeout

from . import utils
from .message import Request, Response
from .transaction import UnreliableTransaction, ProxyTransaction

from .auth import Auth
from .message import Request, Response
from .transaction import UnreliableTransaction


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,11 +57,17 @@ def __init__(self,

@property
def dialog_id(self):
return frozenset((self.original_msg.to_details.details,
self.original_msg.from_details.details,
return frozenset((self.original_msg.to_details['params'].get('tag'),
self.original_msg.from_details['params']['tag'],
self.call_id))

def _receive_response(self, msg):

if 'tag' not in self.to_details['params']:
del self.app._dialogs[self.dialog_id]
self.to_details['params']['tag'] = msg.to_details['params']['tag']
self.app._dialogs[self.dialog_id] = self

try:
transaction = self.transactions[msg.method][msg.cseq]
transaction._incoming(msg)
Expand All @@ -74,7 +79,7 @@ def _receive_response(self, msg):
LOG.debug('Response without Request. The Transaction may already be closed. \n%s', msg)

def _prepare_request(self, method, contact_details=None, headers=None, payload=None, cseq=None, to_details=None):
self.from_details.add_tag()

if not cseq:
self.cseq += 1

Expand Down Expand Up @@ -175,17 +180,6 @@ async def start_unreliable_transaction(self, msg, method=None):
self.transactions[method or msg.method][msg.cseq] = transaction
return await transaction.start()

async def start_proxy_transaction(self, msg, timeout=5):
if msg.cseq not in self.transactions[msg.method]:
transaction = ProxyTransaction(dialog=self, original_msg=msg, loop=self.app.loop, timeout=timeout)
self.transactions[msg.method][msg.cseq] = transaction
async for response in transaction.start():
yield response
else:
LOG.debug('Message already transmitted: %s %s, %s', msg.cseq, msg.method, msg.headers['Call-ID'])
self.transactions[msg.method][msg.cseq].retransmit()
return

def end_transaction(self, transaction):
to_delete = list()
for method, values in self.transactions.items():
Expand All @@ -211,7 +205,6 @@ async def reply(self, request, status_code, status_message=None, payload=None, h

def _prepare_response(self, request, status_code, status_message=None, payload=None, headers=None,
contact_details=None):
self.from_details.add_tag()

if contact_details:
self.contact_details = contact_details
Expand Down Expand Up @@ -273,9 +266,6 @@ async def receive_message(self, msg):
return await self._receive_request(msg)

async def _receive_request(self, msg):
if 'tag' in msg.from_details['params']:
self.to_details['params']['tag'] = msg.from_details['params']['tag']

await self._incoming.put(msg)
self._maybe_close(msg)

Expand Down Expand Up @@ -325,13 +315,25 @@ async def recv(self):

class InviteDialog(DialogBase):
def __init__(self, *args, **kwargs):
super().__init__(method="INVITE", *args, **kwargs)

if 'method' not in kwargs:
kwargs['method'] = 'INVITE'
elif kwargs['method'] != 'INVITE':
raise ValueError('method must be INVITE')

super().__init__(*args, **kwargs)

self._queue = asyncio.Queue()
self._state = CallState.Calling
self._waiter = asyncio.Future()

async def receive_message(self, msg): # noqa: C901

if 'tag' not in self.to_details['params']:
del self.app._dialogs[self.dialog_id]
self.to_details['params']['tag'] = msg.to_details['params']['tag']
self.app._dialogs[self.dialog_id] = self

async def set_result(msg):
self.ack(msg)
if not self._waiter.done():
Expand Down
Loading