Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chat): chat service #135

Merged
merged 11 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions packages/server/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,7 @@ export class Api {
this.sockets = new SocketManager(this.app.sockets)
this.syncs = new SyncApi(this.router, this.app.config, this.app.syncs, this.app.clients, this.app.channels)
this.health = new HealthApi(this.router, this.app.clients, this.app.health)
this.chat = new ChatApi(
this.router,
this.app.clients,
this.app.channels,
this.app.conduits,
this.app.instances,
this.app.conversations,
this.app.messages,
this.app.sockets
)
this.chat = new ChatApi(this.router, this.app.clients, this.app.conversations, this.app.chat)
this.users = new UserApi(this.router, this.app.clients, this.sockets, this.app.sockets, this.app.users)
this.conversations = new ConversationApi(
this.router,
Expand All @@ -53,7 +44,7 @@ export class Api {
this.sockets,
this.app.conversations,
this.app.messages,
this.app.instances,
this.app.chat,
this.app.sockets
)
this.channels = new ChannelApi(this.root, this.app)
Expand Down
20 changes: 15 additions & 5 deletions packages/server/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BatchingService } from './batching/service'
import { CachingService } from './caching/service'
import { ChannelService } from './channels/service'
import { ChatService } from './chat/service'
import { ClientService } from './clients/service'
import { ConduitService } from './conduits/service'
import { ConfigService } from './config/service'
Expand Down Expand Up @@ -44,6 +45,7 @@ export class App {
syncs: SyncService
health: HealthService
sockets: SocketService
chat: ChatService

constructor() {
this.logger = new LoggerService()
Expand All @@ -66,17 +68,12 @@ export class App {
this.mapping = new MappingService(this.database, this.caching, this.batching, this.users, this.conversations)
this.instances = new InstanceService(
this.logger,
this.config,
this.distributed,
this.caching,
this.channels,
this.providers,
this.post,
this.conduits,
this.clients,
this.webhooks,
this.conversations,
this.messages,
this.mapping,
this
)
Expand All @@ -102,6 +99,19 @@ export class App {
this.instances
)
this.sockets = new SocketService(this.caching, this.users)
this.chat = new ChatService(
this.logger,
this.config,
this.post,
this.webhooks,
this.conversations,
this.messages,
this.sockets,
this.mapping,
this.clients,
this.conduits,
this.instances
)
}

async setup() {
Expand Down
25 changes: 25 additions & 0 deletions packages/server/src/channels/base/conduit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,31 @@ export abstract class ConduitInstance<TConfig, TContext extends ChannelContext<a
}
}

async receive(payload: any) {
const conduit = (await this.app.conduits.get(this.conduitId))!
const provider = (await this.app.providers.getById(conduit.providerId))!
const endpoint = await this.extractEndpoint(payload)

if (!endpoint.content.type) {
return
}

// TODO: rehandle sandbox
/*
const clientId = provider.sandbox
? await this.sandbox.getClientId(conduitId, endpoint)
: (await this.app.clients.getByProviderId(provider.id))!.id
*/

const clientId = (await this.app.clients.getByProviderId(provider.id))?.id
if (!clientId) {
return
}

const { userId, conversationId } = await this.app.mapping.getMapping(clientId, conduit.channelId, endpoint)
return this.app.chat.send(conversationId, userId, endpoint.content, { endpoint: _.omit(endpoint, 'content') })
}

async initialize() {}

async destroy() {}
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/channels/discord/conduit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class DiscordConduit extends ConduitInstance<DiscordConfig, DiscordContex

this.client.on('message', async (msg) => {
if (!msg.author.bot) {
await this.app.instances.receive(this.conduitId, msg)
await this.receive(this.conduitId)
}
})

Expand Down
3 changes: 1 addition & 2 deletions packages/server/src/channels/messenger/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ export class MessengerChannel extends Channel<MessengerConduit> {
}

await conduit.client.sendAction(webhookEvent.sender.id, 'mark_seen')

await this.app.instances.receive(conduit.conduitId, webhookEvent)
await conduit.receive(webhookEvent)
}
}

Expand Down
10 changes: 5 additions & 5 deletions packages/server/src/channels/slack/conduit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ export class SlackConduit extends ConduitInstance<SlackConfig, SlackContext> {
return
} else if (actionId.startsWith('quick_reply')) {
await axios.post(payload.response_url, { text: `*${action?.text?.text}*` })
await this.app.instances.receive(this.conduitId, {
await this.receive({
ctx: payload,
content: { type: 'quick_reply', text: action?.text?.text, payload: action?.value }
})
} else if (actionId.startsWith('say_something')) {
await this.app.instances.receive(this.conduitId, {
await this.receive({
ctx: payload,
content: { type: 'say_something', text: action?.value }
})
} else {
await this.app.instances.receive(this.conduitId, {
await this.receive({
ctx: payload,
content: { type: 'postback', payload: action?.value }
})
Expand All @@ -82,7 +82,7 @@ export class SlackConduit extends ConduitInstance<SlackConfig, SlackContext> {

await axios.post(payload.response_url, { text: `*${label}*` })

await this.app.instances.receive(this.conduitId, {
await this.receive({
ctx: payload,
content: { type: 'quick_reply', text: label, payload: action?.value }
})
Expand All @@ -97,7 +97,7 @@ export class SlackConduit extends ConduitInstance<SlackConfig, SlackContext> {
return
}

await this.app.instances.receive(this.conduitId, {
await this.receive({
ctx: payload,
content: {
type: 'text',
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/channels/smooch/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class SmoochChannel extends Channel<SmoochConduit> {
if (req.headers['x-api-key'] === conduit.secret) {
const body = req.body as SmoochPayload
for (const message of body.messages) {
await this.app.instances.receive(conduit.conduitId, { context: body, message })
await conduit.receive({ context: body, message })
}
res.sendStatus(200)
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/channels/teams/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class TeamsChannel extends Channel<TeamsConduit> {
if (conduit.botNewlyAddedToConversation(turnContext)) {
await conduit.sendProactiveMessage(turnContext)
} else {
await this.app.instances.receive(conduit.conduitId, turnContext)
await conduit.receive(turnContext)
}
} catch (e) {
conduit.logger.error(e, 'Error occurred processing teams activity')
Expand Down
8 changes: 4 additions & 4 deletions packages/server/src/channels/telegram/conduit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@ export class TelegramConduit extends ConduitInstance<TelegramConfig, TelegramCon
this.telegraf = new Telegraf(this.config.botToken)
this.telegraf.start(async (ctx) => {
try {
await this.app.instances.receive(this.conduitId, ctx)
await this.receive(ctx)
} catch (e) {
this.logger.error(e, 'Error occured on start')
}
})
this.telegraf.help(async (ctx) => {
try {
await this.app.instances.receive(this.conduitId, ctx)
await this.receive(ctx)
} catch (e) {
this.logger.error(e, 'Error occured on help')
}
})
this.telegraf.on('message', async (ctx) => {
try {
await this.app.instances.receive(this.conduitId, ctx)
await this.receive(ctx)
} catch (e) {
this.logger.error(e, 'Error occurred processing message')
}
})
this.telegraf.on('callback_query', async (ctx) => {
try {
await this.app.instances.receive(this.conduitId, ctx)
await this.receive(ctx)
} catch (e) {
this.logger.error(e, 'Error occurred processing callback query')
}
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/channels/twilio/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class TwilioChannel extends Channel<TwilioConduit> {
validateRequest(conduit.config.authToken, signature, conduit.webhookUrl, req.body) ||
(await this.verifyLegacy(conduit, signature, req))
) {
await this.app.instances.receive(conduit.conduitId, req.body)
await conduit.receive(req.body)
res.sendStatus(204)
} else {
this.logger.error(new Error('Request validation failed. Make sure that your authToken is valid'))
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/channels/vonage/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class VonageChannel extends Channel<VonageConduit> {
const conduit = res.locals.conduit as VonageConduit

if (this.validate(conduit, req)) {
await this.app.instances.receive(conduit.conduitId, req.body)
await conduit.receive(req.body)
}

res.sendStatus(200)
Expand Down
28 changes: 4 additions & 24 deletions packages/server/src/chat/api.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
import { Router } from 'express'
import { ApiRequest, ClientScopedApi } from '../base/api'
import { ChannelService } from '../channels/service'
import { ClientService } from '../clients/service'
import { ConduitService } from '../conduits/service'
import { ConversationService } from '../conversations/service'
import { InstanceService } from '../instances/service'
import { MessageService } from '../messages/service'
import { SocketService } from '../socket/service'
import { ChatReplySchema } from './schema'
import { ChatService } from './service'

export class ChatApi extends ClientScopedApi {
constructor(
router: Router,
clients: ClientService,
private channels: ChannelService,
private conduits: ConduitService,
private instances: InstanceService,
private conversations: ConversationService,
private messages: MessageService,
private sockets: SocketService
private chat: ChatService
) {
super(router, clients)
}
Expand All @@ -34,7 +26,7 @@ export class ChatApi extends ClientScopedApi {
return res.status(400).send(error.message)
}

const { channel, conversationId, payload } = req.body
const { conversationId, payload } = req.body
const conversation = await this.conversations.get(conversationId)
Comment on lines +29 to 30
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: remove channel from request schema. Also remove from client


if (!conversation) {
Expand All @@ -43,19 +35,7 @@ export class ChatApi extends ClientScopedApi {
return res.sendStatus(403)
}

// TODO: this is terrible
let message
if (channel !== 'socket') {
const channelId = this.channels.getByName(channel).id
const conduit = (await this.conduits.getByProviderAndChannel(req.client!.providerId, channelId))!
message = await this.instances.send(conduit.id, conversationId, payload)
} else {
message = await this.messages.create(conversationId, undefined, payload)
const sockets = this.sockets.listByUser(conversation.userId)
for (const socket of sockets) {
socket.send({ type: 'message', data: message })
}
}
const message = await this.chat.send(conversationId, undefined, payload, { clientId: req.client!.id })

res.send(message)
})
Expand Down
Loading