Skip to content

Commit

Permalink
Add destination header to HTTP streamed CESR requests to work with mu…
Browse files Browse the repository at this point in the history
…lti-tenant agents, or any controller infrastructure hosted behind a router. (#491)
  • Loading branch information
pfeairheller committed May 4, 2023
1 parent 4fa138a commit 79ee4a2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 25 deletions.
8 changes: 4 additions & 4 deletions src/keri/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def receipt(self, pre, sn=None):

rcts = dict()
for wit, client in clients.items():
httping.streamCESRRequests(client=client, ims=bytearray(msg), path="/receipts")
httping.streamCESRRequests(client=client, dest=wit, ims=bytearray(msg), path="/receipts")
while not client.responses:
yield self.tock

Expand Down Expand Up @@ -120,7 +120,7 @@ def receipt(self, pre, sn=None):

client = clients[wit]

sent = httping.streamCESRRequests(client=client, ims=bytearray(msg))
sent = httping.streamCESRRequests(client=client, dest=wit, ims=bytearray(msg))
while len(client.responses) < sent:
yield self.tock

Expand Down Expand Up @@ -189,7 +189,7 @@ def catchup(self, pre, wit):
self.extend([clientDoer])

for fmsg in hab.db.clonePreIter(pre=pre):
httping.streamCESRRequests(client=client, ims=bytearray(fmsg))
httping.streamCESRRequests(client=client, dest=wit, ims=bytearray(fmsg))
while not client.responses:
yield self.tock

Expand Down Expand Up @@ -756,7 +756,7 @@ def msgDo(self, tymth=None, tock=0.0):
yield self.tock

msg = self.msgs.popleft()
self.posted += httping.streamCESRRequests(client=self.client, ims=msg)
self.posted += httping.streamCESRRequests(client=self.client, dest=self.wit, ims=msg)
while self.client.requests:
yield self.tock

Expand Down
2 changes: 1 addition & 1 deletion src/keri/app/cli/commands/mailbox/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def readDo(self, tymth, tock=0.0):
else:
msg = hab.query(pre=hab.pre, src=self.witness, route="mbx", query=q)

httping.createCESRRequest(msg, client)
httping.createCESRRequest(msg, client, dest=self.witness)

while client.requests:
yield self.tock
Expand Down
16 changes: 11 additions & 5 deletions src/keri/app/httping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

CESR_CONTENT_TYPE = "application/cesr+json"
CESR_ATTACHMENT_HEADER = "CESR-ATTACHMENT"
CESR_DESTINATION_HEADER = "CESR-DESTINATION"


class SignatureValidationComponent(object):
Expand Down Expand Up @@ -112,13 +113,15 @@ def parseCesrHttpRequest(req):
return cr


def createCESRRequest(msg, client, path=None):
def createCESRRequest(msg, client, dest, path=None):
"""
Turns a KERI message into a CESR http request against the provided hio http Client
Parameters
msg: KERI message parsable as Serder.raw
dest (str): qb64 identifier prefix of destination controller
client: hio http Client that will send the message as a CESR request
path (str): path to post to
"""
path = path if path is not None else "/"
Expand All @@ -137,7 +140,8 @@ def createCESRRequest(msg, client, path=None):
("Content-Type", CESR_CONTENT_TYPE),
("Content-Length", len(body)),
("connection", "close"),
(CESR_ATTACHMENT_HEADER, attachments)
(CESR_ATTACHMENT_HEADER, attachments),
(CESR_DESTINATION_HEADER, dest)
])

client.request(
Expand All @@ -148,13 +152,14 @@ def createCESRRequest(msg, client, path=None):
)


def streamCESRRequests(client, ims, path=None):
def streamCESRRequests(client, ims, dest, path=None):
"""
Turns a stream of KERI messages into CESR http requests against the provided hio http Client
Parameters
ims (bytearray): stream of KERI messages parsable as Serder.raw
client (Client): hio http Client that will send the message as a CESR request
ims (bytearray): stream of KERI messages parsable as Serder.raw
dest (str): qb64 identifier prefix of destination controller
path (str): path to post to
Returns
Expand Down Expand Up @@ -189,7 +194,8 @@ def streamCESRRequests(client, ims, path=None):
headers = Hict([
("Content-Type", CESR_CONTENT_TYPE),
("Content-Length", len(body)),
(CESR_ATTACHMENT_HEADER, attachment)
(CESR_ATTACHMENT_HEADER, attachment),
(CESR_DESTINATION_HEADER, dest)
])

client.request(
Expand Down
2 changes: 1 addition & 1 deletion src/keri/app/indirecting.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ def eventDo(self, tymth=None, tock=0.0):
else:
msg = self.hab.query(pre=self.pre, src=self.witness, route="mbx", query=q)

httping.createCESRRequest(msg, client)
httping.createCESRRequest(msg, client, dest=self.witness)

while client.requests:
yield self.tock
Expand Down
29 changes: 15 additions & 14 deletions tests/app/test_httping.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_create_cesr_request(mockHelpingNowUTC):
route="tels")
client = MockClient()

httping.createCESRRequest(msg, client, path="/qry/tels")
httping.createCESRRequest(msg, client, dest=wit, path="/qry/tels")

args = client.args.pop()
assert args["method"] == "POST"
Expand All @@ -94,7 +94,7 @@ def test_create_cesr_request(mockHelpingNowUTC):
msg = hab.query(pre=hab.pre, src=wit, route="mbx", query=dict(s=0))
client = MockClient()

httping.createCESRRequest(msg, client, path="/qry/mbx")
httping.createCESRRequest(msg, client, dest=wit, path="/qry/mbx")

args = client.args.pop()
assert args["method"] == "POST"
Expand All @@ -107,9 +107,10 @@ def test_create_cesr_request(mockHelpingNowUTC):
headers = args["headers"]
assert headers["Content-Type"] == "application/cesr+json"
assert headers["Content-Length"] == 260
assert headers["CESR-ATTACHMENT"] == bytearray(b'-VAj-HABEIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3-AABAAB6P97k'
b'Z3al3V3z3VstRtHRPeOrotuqZZUgBl2yHzgpGyOjAXYGinVqWLAMhdmQ089FTSAz'
b'qSTBmJzI8RvIezsJ')
assert headers["CESR-ATTACHMENT"] == bytearray(
b'-VAj-HABEIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3-AABAAB6P97k'
b'Z3al3V3z3VstRtHRPeOrotuqZZUgBl2yHzgpGyOjAXYGinVqWLAMhdmQ089FTSAz'
b'qSTBmJzI8RvIezsJ')


def test_stream_cesr_request(mockHelpingNowUTC):
Expand All @@ -124,7 +125,7 @@ def test_stream_cesr_request(mockHelpingNowUTC):
route="tels")
client = MockClient()

httping.streamCESRRequests(client, msg, path="/qry/tels")
httping.streamCESRRequests(client, msg, dest=wit, path="/qry/tels")

args = client.args.pop()
assert args["method"] == "POST"
Expand All @@ -141,7 +142,7 @@ def test_stream_cesr_request(mockHelpingNowUTC):
msg = hab.query(pre=hab.pre, src=wit, route="mbx", query=dict(s=0))
client = MockClient()

httping.streamCESRRequests(client, msg, path="/qry/mbx")
httping.streamCESRRequests(client, msg, dest=wit, path="/qry/mbx")

args = client.args.pop()
assert args["method"] == "POST"
Expand All @@ -162,16 +163,16 @@ def test_stream_cesr_request(mockHelpingNowUTC):
msgs.extend(hab.makeOwnEvent(sn=0))

client = MockClient()
httping.streamCESRRequests(client, msgs)
httping.streamCESRRequests(client, msgs, dest=wit)
assert len(client.args) == 2
args = client.args.pop()
assert args["method"] == "POST"
assert args["path"] == "/"

assert args["body"] == (b'{"v":"KERI10JSON00012b_","t":"icp","d":"EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2'
b'QV8dDjI3","i":"EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3","s":"0","kt":"1'
b'","k":["DGmIfLmgErg4zFHfPwaDckLNxsLqc5iS_P0QbLjbWR0I"],"nt":"1","n":["EJhRr1'
b'0e5p7LVB6JwLDIcgqsISktnfe5m60O_I2zZO6N"],"bt":"0","b":[],"c":[],"a":[]}')
b'QV8dDjI3","i":"EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3","s":"0","kt":"1'
b'","k":["DGmIfLmgErg4zFHfPwaDckLNxsLqc5iS_P0QbLjbWR0I"],"nt":"1","n":["EJhRr1'
b'0e5p7LVB6JwLDIcgqsISktnfe5m60O_I2zZO6N"],"bt":"0","b":[],"c":[],"a":[]}')
headers = args["headers"]
assert headers['Content-Length'] == 299
assert headers['Content-Type'] == 'application/cesr+json'
Expand All @@ -183,9 +184,9 @@ def test_stream_cesr_request(mockHelpingNowUTC):
assert args["path"] == "/"

assert args["body"] == (b'{"v":"KERI10JSON000105_","t":"qry","d":"EHtaQHsKzezkQUEYjMjEv6nIf4AhhR9Zy6Av'
b'cfyGCXkI","dt":"2021-01-01T00:00:00.000000+00:00","r":"logs","rr":"","q":{"s'
b'":0,"i":"EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3","src":"BGKVzj4ve0VSd8'
b'z_AmvhLg4lqcC_9WYX90k03q-R_Ydo"}}')
b'cfyGCXkI","dt":"2021-01-01T00:00:00.000000+00:00","r":"logs","rr":"","q":{"s'
b'":0,"i":"EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3","src":"BGKVzj4ve0VSd8'
b'z_AmvhLg4lqcC_9WYX90k03q-R_Ydo"}}')
headers = args["headers"]
assert headers['Content-Length'] == 261
assert headers['Content-Type'] == 'application/cesr+json'
Expand Down

0 comments on commit 79ee4a2

Please sign in to comment.