Skip to content

Commit

Permalink
fix: create message subscription first (#1549)
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Glastra <timo@animo.id>
  • Loading branch information
TimoGlastra committed Aug 18, 2023
1 parent 9377378 commit 93276de
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions packages/core/src/agent/Agent.ts
Expand Up @@ -148,6 +148,27 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
}

public async initialize() {
const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)

// Listen for new messages (either from transports or somewhere else in the framework / extensions)
// We create this before doing any other initialization, so the initialization could already receive messages
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
})
)
)
.subscribe()

await super.initialize()

for (const [, module] of Object.entries(this.dependencyManager.registeredModules) as [string, Module][]) {
Expand Down Expand Up @@ -179,26 +200,6 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
await this.mediator.initialize()
await this.mediationRecipient.initialize()

const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)

// Listen for new messages (either from transports or somewhere else in the framework / extensions)
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
})
)
)
.subscribe()

this._isInitialized = true
}

Expand Down

0 comments on commit 93276de

Please sign in to comment.