-
Notifications
You must be signed in to change notification settings - Fork 237
fix: alter mediation recipient websocket transport priority #434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9bdba97
791b740
c468e33
15d517e
3e6da2f
b17d85a
4f1da15
267dbab
68f9090
c959cdc
bda647f
f151915
4ff41f0
b044b3d
02cd324
8a613c4
0cc91a5
5c98a5c
f1ba083
f682344
bd05532
92548ae
d4eaac6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import type { Logger } from '../../logger' | ||
import type { OutboundWebSocketClosedEvent } from '../../transport' | ||
import type { OutboundMessage } from '../../types' | ||
import type { ConnectionRecord } from '../connections' | ||
import type { MediationStateChangedEvent } from './RoutingEvents' | ||
import type { MediationRecord } from './index' | ||
|
@@ -73,6 +74,23 @@ export class RecipientModule { | |
} | ||
} | ||
|
||
private async sendMessage(outboundMessage: OutboundMessage) { | ||
const { mediatorPickupStrategy } = this.agentConfig | ||
const transportPriority = | ||
mediatorPickupStrategy === MediatorPickupStrategy.Implicit | ||
? { schemes: ['wss', 'ws'], restrictive: true } | ||
: undefined | ||
|
||
await this.messageSender.sendMessage(outboundMessage, { | ||
transportPriority, | ||
// TODO: add keepAlive: true to enforce through the public api | ||
// we need to keep the socket alive. It already works this way, but would | ||
// be good to make more explicit from the public facing API. | ||
// This would also make it easier to change the internal API later on. | ||
// keepAlive: true, | ||
}) | ||
} | ||
|
||
private async openMediationWebSocket(mediator: MediationRecord) { | ||
const { message, connectionRecord } = await this.connectionService.createTrustPing(mediator.connectionId) | ||
|
||
|
@@ -85,17 +103,21 @@ export class RecipientModule { | |
throw new AriesFrameworkError('Cannot open websocket to connection without websocket service endpoint') | ||
} | ||
|
||
await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message), { | ||
transportPriority: { | ||
schemes: websocketSchemes, | ||
restrictive: true, | ||
// TODO: add keepAlive: true to enforce through the public api | ||
// we need to keep the socket alive. It already works this way, but would | ||
// be good to make more explicit from the public facing API. | ||
// This would also make it easier to change the internal API later on. | ||
// keepAlive: true, | ||
}, | ||
}) | ||
try { | ||
await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message), { | ||
transportPriority: { | ||
schemes: websocketSchemes, | ||
restrictive: true, | ||
// TODO: add keepAlive: true to enforce through the public api | ||
// we need to keep the socket alive. It already works this way, but would | ||
// be good to make more explicit from the public facing API. | ||
// This would also make it easier to change the internal API later on. | ||
// keepAlive: true, | ||
}, | ||
}) | ||
} catch (error) { | ||
this.logger.warn('Unable to open websocket connection to mediator', { error }) | ||
} | ||
} | ||
|
||
private async initiateImplicitPickup(mediator: MediationRecord) { | ||
|
@@ -118,7 +140,10 @@ export class RecipientModule { | |
// Wait for interval time before reconnecting | ||
delay(interval) | ||
) | ||
.subscribe(() => { | ||
.subscribe(async () => { | ||
this.logger.warn( | ||
`Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...` | ||
) | ||
this.openMediationWebSocket(mediator) | ||
}) | ||
|
||
|
@@ -163,7 +188,7 @@ export class RecipientModule { | |
|
||
const batchPickupMessage = new BatchPickupMessage({ batchSize: 10 }) | ||
const outboundMessage = createOutboundMessage(mediatorConnection, batchPickupMessage) | ||
await this.messageSender.sendMessage(outboundMessage) | ||
await this.sendMessage(outboundMessage) | ||
} | ||
|
||
public async setDefaultMediator(mediatorRecord: MediationRecord) { | ||
|
@@ -173,15 +198,15 @@ export class RecipientModule { | |
public async requestMediation(connection: ConnectionRecord): Promise<MediationRecord> { | ||
const { mediationRecord, message } = await this.mediationRecipientService.createRequest(connection) | ||
const outboundMessage = createOutboundMessage(connection, message) | ||
await this.messageSender.sendMessage(outboundMessage) | ||
|
||
await this.sendMessage(outboundMessage) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like we're conflating specific transport protocols with the implicit / explicit pickup strategy (I'm also to blame for this) Do we always need to request mediation over websockets if we're using the implicit pickup strategy? I think not. I'm fine with taking this approach for now, but would like to keep this in mind going forward. It closes some doors that I would like to keep open :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, I think that we'll want to adjust this in the future, especially with pickup-protocol v2. |
||
return mediationRecord | ||
} | ||
|
||
public async notifyKeylistUpdate(connection: ConnectionRecord, verkey: string) { | ||
const message = this.mediationRecipientService.createKeylistUpdateMessage(verkey) | ||
const outboundMessage = createOutboundMessage(connection, message) | ||
const response = await this.messageSender.sendMessage(outboundMessage) | ||
return response | ||
await this.sendMessage(outboundMessage) | ||
} | ||
|
||
public async findByConnectionId(connectionId: string) { | ||
|
@@ -230,7 +255,7 @@ export class RecipientModule { | |
|
||
// Send mediation request message | ||
const outboundMessage = createOutboundMessage(connection, message) | ||
await this.messageSender.sendMessage(outboundMessage) | ||
await this.sendMessage(outboundMessage) | ||
|
||
const event = await firstValueFrom(subject) | ||
return event.payload.mediationRecord | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we want to swallow this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue that I ran into is that it will throw an error if you don't have internet access, etc. Which means that if you throw it, it won't be caught and will not be able to reconnect properly. As far as I saw at least. Happy to go about this a different way in the future if desired.