Skip to content
This repository has been archived by the owner on Nov 22, 2023. It is now read-only.

Commit

Permalink
feat: implement archipelago with NATS
Browse files Browse the repository at this point in the history
  • Loading branch information
Juan Scolari committed Apr 11, 2022
1 parent 48d7079 commit 8be9724
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 47 deletions.
8 changes: 6 additions & 2 deletions .env.default
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@
HTTP_SERVER_PORT=3000
HTTP_SERVER_HOST=0.0.0.0

NATS_SERVER_PORT=4222
NATS_SERVER_HOST=0.0.0.0
NATS_URL=localhost:4222

ARCHIPELAGO_FLUSH_FREQUENCY=2.0
ARCHIPELAGO_JOIN_DISTANCE=64
ARCHIPELAGO_LEAVE_DISTANCE=80
ARCHIPELAGO_MAX_PEERS_PER_ISLAND=100
34 changes: 32 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"semi": false
},
"dependencies": {
"@dcl/archipelago": "^1.0.0",
"@well-known-components/env-config-provider": "^1.1.1",
"@well-known-components/http-server": "^1.1.1",
"@well-known-components/interfaces": "^1.1.1",
Expand Down
3 changes: 3 additions & 0 deletions src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { createMetricsComponent } from "@well-known-components/metrics"
import { AppComponents, GlobalContext } from "./types"
import { metricDeclarations } from "./metrics"
import { createMessageBrokerComponent } from "./ports/message-broker"
import { createArchipelagoComponent } from "./ports/archipelago"

// Initialize all the components of the app
export async function initComponents(): Promise<AppComponents> {
Expand All @@ -17,6 +18,7 @@ export async function initComponents(): Promise<AppComponents> {
const fetch = await createFetchComponent()
const metrics = await createMetricsComponent(metricDeclarations, { server, config })
const messageBroker = await createMessageBrokerComponent({ config, logs })
const archipelago = await createArchipelagoComponent({ config })

return {
config,
Expand All @@ -26,5 +28,6 @@ export async function initComponents(): Promise<AppComponents> {
fetch,
metrics,
messageBroker,
archipelago,
}
}
3 changes: 0 additions & 3 deletions src/controllers/handlers/subjects-handler.ts

This file was deleted.

6 changes: 0 additions & 6 deletions src/controllers/subjects.ts

This file was deleted.

21 changes: 21 additions & 0 deletions src/controllers/topics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { IslandUpdates, PeerPositionChange } from "@dcl/archipelago"
import { JSONCodec } from "nats"
import { GlobalContext } from "../types"

const jsonCodec = JSONCodec()

export function setupTopics(globalContext: GlobalContext): void {
const messageBroker = globalContext.components.messageBroker
const archipelago = globalContext.components.archipelago

messageBroker.subscribe("heartbeat", (data: Uint8Array) => {
const peerPositionChange = jsonCodec.decode(data) as PeerPositionChange
archipelago.setPeersPositions(peerPositionChange)
})

archipelago.subscribeToUpdates((updates: IslandUpdates) => {
if (Object.keys(updates).length) {
messageBroker.publish("island_updates", updates)
}
})
}
28 changes: 28 additions & 0 deletions src/ports/archipelago.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ArchipelagoController, defaultArchipelagoController } from "@dcl/archipelago"
import { IConfigComponent } from "@well-known-components/interfaces"

export declare type IArchipelagoComponent = ArchipelagoController

export declare type ArchipelagoComponents = {
config: IConfigComponent
}

export async function createArchipelagoComponent(components: ArchipelagoComponents): Promise<ArchipelagoController> {
const { config } = components

const flushFrequency = await config.requireNumber("ARCHIPELAGO_FLUSH_FREQUENCY")
const joinDistance = await config.requireNumber("ARCHIPELAGO_JOIN_DISTANCE")
const leaveDistance = await config.requireNumber("ARCHIPELAGO_LEAVE_DISTANCE")
const maxPeersPerIsland = await config.requireNumber("ARCHIPELAGO_MAX_PEERS_PER_ISLAND")

const controller = defaultArchipelagoController({
flushFrequency,
archipelagoParameters: {
joinDistance,
leaveDistance,
maxPeersPerIsland,
},
})

return controller
}
79 changes: 48 additions & 31 deletions src/ports/message-broker.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,81 @@
import { IConfigComponent, ILoggerComponent } from "@well-known-components/interfaces"
import { connect, JSONCodec } from "nats"
import { IBaseComponent, IConfigComponent, ILoggerComponent } from "@well-known-components/interfaces"
import { connect, JSONCodec, StringCodec, NatsConnection, Subscription } from "nats"
import { BaseComponents } from "../types"

export declare type IMessageBrokerComponent = {
publish(subject: string, message: any): void
subscribe(subject: string, handler: Function, respond?: boolean): void
}
publish(topic: string, message: any): void
subscribe(topic: string, handler: Function): Subscription

export declare type MessageBrokerComponents = {
config: IConfigComponent
logs: ILoggerComponent
start(): Promise<void>
stop(): Promise<void>
}

export async function createMessageBrokerComponent(
components: MessageBrokerComponents
): Promise<IMessageBrokerComponent> {
components: Pick<BaseComponents, "config" | "logs">
): Promise<IMessageBrokerComponent & IBaseComponent> {
const { config, logs } = components
const logger = logs.getLogger("MessageBroker")
const jsonCodec = JSONCodec()
const stringCodec = StringCodec()

// config
const port = await config.requireNumber("NATS_SERVER_PORT")
const host = await config.requireString("NATS_SERVER_HOST")

const serverConfig = { servers: `${host}:${port}` }
const server = await connect(serverConfig)
const natsUrl = (await config.getString("NATS_URL")) || "nats.decentraland.zone:4222"
const natsConfig = { servers: `${natsUrl}` }
let natsConnection: NatsConnection

const publish = (subject: string, message: any) => {
server.publish(subject, jsonCodec.encode(message))
function publish(topic: string, message: any): void {
if (message instanceof Uint8Array) {
natsConnection.publish(topic, message)
} else if (typeof message === "object") {
natsConnection.publish(topic, jsonCodec.encode(message))
} else if (typeof message === "string") {
natsConnection.publish(topic, stringCodec.encode(message))
} else {
logger.error(`Invalid message: ${JSON.stringify(message)}`)
}
}

const subscribe = (subject: string, handler: Function, respond?: boolean) => {
const subscription = server.subscribe(subject)
function subscribe(topic: string, handler: Function): Subscription {
const subscription = natsConnection.subscribe(topic)
;(async () => {
for await (const message of subscription) {
try {
if (message.data.length) {
const data = jsonCodec.decode(message.data) as any
logger.debug(`[${subscription.getProcessed()}]: ${message.subject}: ${JSON.stringify(data, null, 2)}`)
const data = message.data
const payload = await handler(data)
if (respond) {
message.respond(jsonCodec.encode(payload))
}
} else {
logger.debug(`[${subscription.getProcessed()}]: ${message.subject}`)
const payload = await handler()
if (respond) {
message.respond(jsonCodec.encode(payload))
}
}
} catch (err: any) {
logger.error(err)
}
}
})()
return subscription
}

async function start() {
try {
natsConnection = await connect(natsConfig)
logger.info(`Connected to NATS: ${natsUrl}`)
} catch (error) {
logger.error(`An error occurred trying to connect to the NATS server: ${natsUrl}`)
throw error
}
}

const nats: IMessageBrokerComponent = {
async function stop() {
try {
await natsConnection.close()
} catch (error) {
logger.error(`An error occurred trying to close the connection to the NATS server: ${natsUrl}`)
}
}

return {
publish,
subscribe,
start,
stop,
}

return nats
}
6 changes: 3 additions & 3 deletions src/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Lifecycle } from "@well-known-components/interfaces"
import { setupRouter } from "./controllers/routes"
import { setupSubjects } from "./controllers/subjects"
import { setupTopics } from "./controllers/topics"
import { AppComponents, GlobalContext, TestComponents } from "./types"

// this function wires the business logic (adapters & controllers) with the components (ports)
Expand All @@ -19,8 +19,8 @@ export async function main(program: Lifecycle.EntryPointParameters<AppComponents
// set the context to be passed to the handlers
components.server.setContext(globalContext)

setupSubjects(components.messageBroker)

// start ports: db, listeners, synchronizations, etc
await startComponents()

setupTopics(globalContext)
}
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
IMetricsComponent,
} from "@well-known-components/interfaces"
import { metricDeclarations } from "./metrics"
import { IArchipelagoComponent } from "./ports/archipelago"
import { IMessageBrokerComponent } from "./ports/message-broker"

export type GlobalContext = {
Expand All @@ -21,6 +22,7 @@ export type BaseComponents = {
fetch: IFetchComponent
metrics: IMetricsComponent<keyof typeof metricDeclarations>
messageBroker: IMessageBrokerComponent
archipelago: IArchipelagoComponent
}

// components used in runtime
Expand Down

0 comments on commit 8be9724

Please sign in to comment.