Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reorder the closing of services, prevent sagas running multiple times and close backend server properly #2499

Merged
merged 13 commits into from
May 9, 2024
8 changes: 4 additions & 4 deletions packages/backend/src/backendManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ export const runBackendMobile = async () => {
{ logger: ['warn', 'error', 'log', 'debug', 'verbose'] }
)

rn_bridge.channel.on('close', async () => {
rn_bridge.channel.on('close', () => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
await connectionsManager.pause()
connectionsManager.pause()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for async here since nothing awaits the event handler

})

rn_bridge.channel.on('open', async (msg: OpenServices) => {
rn_bridge.channel.on('open', (msg: OpenServices) => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
const torControl = app.get<TorControl>(TorControl)
const proxyAgent = app.get<{ proxy: { port: string } }>(SOCKS_PROXY_AGENT)
Expand All @@ -123,7 +123,7 @@ export const runBackendMobile = async () => {
torControl.torControlParams.auth.value = msg.authCookie
proxyAgent.proxy.port = msg.httpTunnelPort

await connectionsManager.resume()
connectionsManager.resume()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,47 +223,51 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
}
}

public async closeAllServices(options: { saveTor: boolean } = { saveTor: false }) {
public async closeAllServices(
options: { saveTor: boolean; purgeLocalDb: boolean } = { saveTor: false, purgeLocalDb: false }
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a little clunky to add another parameter here since closeAllServices is doing a little more than just closing things now, but it seems like the simplest change without a larger refactor. Also if closeAllServices is purging local-db then maybe it should also purge data on the filesystem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Why do we need to add this parameter? Maybe I'm reading into the name localDbService too much, but it seems like the data that would be cleared by this.localDbService.purge() would be the same data that would be cleared by this.purgeData() called in leaveCommunity() since that purgeData() seems to holistically remove all the local db data from the filesystem. Adding a side effect to closeAllServices() seems like it would be a bit of a code smell.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure, if we are sure that LevelDB would be totally cleared by purgeData then I don't think this.localDbServer.purge() is required - I just rearranged what was already there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we close and then re-open the same LevelDB, is there data in memory that sticks around? Or does removing the LevelDB files clear everything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the link above "DestroyDB operation will be
faster for moderate to large databases (its implementation works
pretty much as Krzysztof Kowalczyk mentioned: it deletes all the
database files in the directory)." and then taking a look at the source code itself, looks like that is what's going on: https://github.com/google/leveldb/blob/068d5ee1a3ac40dabd00d211d5013af44be55bea/db/db_impl.cc#L1542

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tested this and it appears to work.

) {
this.logger('Closing services')

await this.closeSocket()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close the data server first


if (this.tor && !options.saveTor) {
this.logger('Killing tor')
await this.tor.kill()
} else if (options.saveTor) {
this.logger('Saving tor')
}
if (this.storageService) {
this.logger('Stopping orbitdb')
this.logger('Stopping OrbitDB')
await this.storageService?.stopOrbitDb()
}
if (this.serverIoProvider?.io) {
this.logger('Closing socket server')
this.serverIoProvider.io.close()
}
if (this.localDbService) {
this.logger('Closing local storage')
await this.localDbService.close()
}
if (this.libp2pService) {
this.logger('Stopping libp2p')
await this.libp2pService.close()
}
if (this.localDbService) {
if (options.purgeLocalDb) {
this.logger('Purging local DB')
await this.localDbService.purge()
}
this.logger('Closing local DB')
await this.localDbService.close()
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-organizing. Data server should get closed first and then OrbitDB, then LibP2P


public closeSocket() {
this.serverIoProvider.io.close()
public async closeSocket() {
await this.socketService.close()
}

public async pause() {
this.logger('Pausing!')
this.logger('Closing socket!')
this.closeSocket()
await this.closeSocket()
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove the log because there is a similar log in sockerService.close, but we can keep it too, just let me know.

this.logger('Pausing libp2pService!')
this.peerInfo = await this.libp2pService?.pause()
this.logger('Found the following peer info on pause: ', this.peerInfo)
}

public async resume() {
this.logger('Resuming!')
this.logger('Reopening socket!')
await this.openSocket()
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I remove the log because there is a similar log in socketService.listen. We can keep both also, just let me know.

this.logger('Attempting to redial peers!')
if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) {
Expand All @@ -286,24 +290,17 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
await this.socketService.init()
}

public async leaveCommunity(): Promise<boolean> {
public async leaveCommunity() {
this.logger('Running leaveCommunity')

this.logger('Resetting tor')
this.tor.resetHiddenServices()

this.logger('Closing the socket')
this.closeSocket()

this.logger('Purging local DB')
await this.localDbService.purge()

this.logger('Closing services')
await this.closeAllServices({ saveTor: true })
await this.closeAllServices({ saveTor: true, purgeLocalDb: true })

this.logger('Purging data')
await this.purgeData()

this.logger('Resetting Tor')
this.tor.resetHiddenServices()

this.logger('Resetting state')
await this.resetState()

Expand Down
69 changes: 61 additions & 8 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ import { CONFIG_OPTIONS, SERVER_IO_PROVIDER } from '../const'
import { ConfigOptions, ServerIoProviderTypes } from '../types'
import { suspendableSocketEvents } from './suspendable.events'
import Logger from '../common/logger'
import { sleep } from '../common/sleep'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] unused import

import type net from 'node:net'

@Injectable()
export class SocketService extends EventEmitter implements OnModuleInit {
private readonly logger = Logger(SocketService.name)

public resolveReadyness: (value: void | PromiseLike<void>) => void
public readyness: Promise<void>
private listening: boolean
private closeSockets: () => void

constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
Expand All @@ -44,12 +48,14 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.readyness = new Promise<void>(resolve => {
this.resolveReadyness = resolve
})

this.listening = false
this.closeSockets = this.attachListeners()
}

async onModuleInit() {
this.logger('init: Started')

this.attachListeners()
await this.init()

this.logger('init: Finished')
Expand All @@ -71,7 +77,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.logger('init: Frontend connected')
}

private readonly attachListeners = (): void => {
private readonly attachListeners = (): (() => void) => {
// Attach listeners here
this.serverIoProvider.io.on(SocketActionTypes.CONNECTION, socket => {
this.logger('Socket connection')
Expand Down Expand Up @@ -195,25 +201,72 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.emit(SocketActionTypes.LOAD_MIGRATION_DATA, data)
})
})

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
//
// I also tried `this.serverIoProvider.io.disconnectSockets(true)`
// which didn't work for me.
const sockets = new Set<net.Socket>()

this.serverIoProvider.server.on('connection', conn => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[bug] This seems like a confusion of the server and the socket io.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in my testing never seems to be called

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets called for me. There are two different sockets, the socket.io Socket (https://socket.io/docs/v4/server-api/#socket) and the Node net.Socket. I'm really just following the advice here: socketio/socket.io#1602 and it worked for me in cleaning up the hanging connections I was seeing, so I just went with that.

sockets.add(conn)
conn.on('close', () => {
sockets.delete(conn)
})
})

return () => sockets.forEach(s => s.destroy())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[design] This feels very unintuitive to me. attachListeners() doubling as closeSockets() feels like a violation of the single responsibility principle. Why can't we have a separate closeSockets() function? This would make the code easier to read and understand.

}

public getConnections = (): Promise<number> => {
return new Promise(resolve => {
this.serverIoProvider.server.getConnections((err, count) => {
if (err) throw new Error(err.message)
resolve(count)
})
})
}

public listen = async (port = this.configOptions.socketIOPort): Promise<void> => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[refactor] port is unused. We should either remove the parameter or use it in the function instead of hardcoding the port number.

return await new Promise(resolve => {
if (this.serverIoProvider.server.listening) resolve()
this.logger(`Opening data server on port ${this.configOptions.socketIOPort}`)

// Sometimes socket.io closes the HTTP server but doesn't close
// all underlying connections. So it doesn't appear that
// `this.serverIoProvider.server.listening` is sufficient.
if (this.listening) {
const numConnections = await this.getConnections()
this.logger('Failed to listen. Connections still open:', numConnections)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Wouldn't it be better to just clean up the existing connections and proceed with listening?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I thought about that, but there might be some trickiness with timing due to the listen and close being async. I'll have to look into it more, so just went with the simplest option for now.

return
}

return new Promise(resolve => {
this.serverIoProvider.server.listen(this.configOptions.socketIOPort, '127.0.0.1', () => {
this.logger(`Data server running on port ${this.configOptions.socketIOPort}`)
this.listening = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[design] This seems like reinventing the wheel. I feel like this just introduces chances for desync between the state of the http server and the state of SocketService.listening. I'd rather see us use the built in property and properly handle the cases related to no longer listening, but not having properly cleaned up connections.

resolve()
})
})
}

public close = async (): Promise<void> => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)
return await new Promise(resolve => {
this.serverIoProvider.server.close(err => {
public close = (): Promise<void> => {
return new Promise(resolve => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)

if (!this.listening) {
this.logger('Data server is not running.')
resolve()
return
}

this.serverIoProvider.io.close(err => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Does this suffer from the same issue mentioned in the cleanup function stuck in attachListeners where you need to loop through each socket and disconnect? Also, does this remove all of the listeners that were attached in the attachListeners function? Could this cause a memory leak and duplication of actions if the SocketService is started back up after being close and the listeners are attached again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this doesn't appear to close all connections properly. I don't think we need to remove the listeners ever. So those just get added once in the constructor.

if (err) throw new Error(err.message)
this.logger('Data server closed')
this.listening = false
resolve()
})
this.logger('Disconnecting sockets')
this.closeSockets()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Shouldn't we also detach listeners when closing sockets? I mentioned it before, but I'd rather see a dedicated closeSockets() and detachListeners() method.

Copy link
Collaborator Author

@leblowl leblowl May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to detach any listeners. They are just attached once and persist between opening and closing of the server.

})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ export class CertificatesRequestsStore extends EventEmitter {
}

public async close() {
this.logger('Closing...')
this.logger('Closing certificate requests DB')
await this.store?.close()
this.logger('Closed')
this.logger('Closed certificate requests DB')
}

public getAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ export class CertificatesStore extends EventEmitter {
}

public async close() {
this.logger('Closing certificates DB')
await this.store?.close()
this.logger('Closed certificates DB')
}

public getAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ export class CommunityMetadataStore extends EventEmitter {
}

public async close() {
logger('Closing community metadata DB')
await this.store?.close()
logger('Closed community metadata DB')
}

public async updateCommunityMetadata(newMeta: CommunityMetadata): Promise<CommunityMetadata | undefined> {
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/nest/storage/storage.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ export class StorageService extends EventEmitter {

public async stopOrbitDb() {
try {
this.logger('Closing channels DB')
await this.channels?.close()
this.logger('Closed channels DB')
} catch (e) {
this.logger.error('Error closing channels db', e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ export class UserProfileStore extends EventEmitter {
}

public async close() {
logger('Closing user profile DB')
await this.store?.close()
logger('Closed user profile DB')
}

public async addUserProfile(userProfile: UserProfile) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
import { io } from 'socket.io-client'
import { select, put, call, cancel, fork, takeEvery, FixedTask, delay, apply, putResolve } from 'typed-redux-saga'
import {
select,
put,
putResolve,
call,
cancel,
fork,
take,
takeLeading,
takeEvery,
FixedTask,
delay,
apply,
} from 'typed-redux-saga'
import { PayloadAction } from '@reduxjs/toolkit'
import { socket as stateManager, Socket } from '@quiet/state-manager'
import { encodeSecret } from '@quiet/common'
Expand Down Expand Up @@ -49,17 +62,20 @@ export function* startConnectionSaga(
})
yield* fork(handleSocketLifecycleActions, socket, action.payload)
// Handle opening/restoring connection
yield* takeEvery(initActions.setWebsocketConnected, setConnectedSaga, socket)
yield* takeLeading(initActions.setWebsocketConnected, setConnectedSaga, socket)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only needs to happen in sequence

}

function* setConnectedSaga(socket: Socket): Generator {
console.log('Frontend is ready. Forking state-manager sagas and starting backend...')

const task = yield* fork(stateManager.useIO, socket)
console.log('WEBSOCKET', 'Forking state-manager sagas', task)
// Handle suspending current connection
yield* takeEvery(initActions.suspendWebsocketConnection, cancelRootTaskSaga, task)
console.log('Frontend is ready. Starting backend...')

// @ts-ignore - Why is this broken?
yield* apply(socket, socket.emit, [SocketActionTypes.START])

// Handle suspending current connection
const suspendAction = yield* take(initActions.suspendWebsocketConnection)
yield* call(cancelRootTaskSaga, task, suspendAction)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only needs to happen once (takeEvery is not necessary and causes issues)

}

function* handleSocketLifecycleActions(socket: Socket, socketIOData: WebsocketConnectionPayload): Generator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { eventChannel } from 'redux-saga'
import { call, put, take } from 'typed-redux-saga'
import { call, put, take, cancelled } from 'typed-redux-saga'
import { app, publicChannels, WEBSOCKET_CONNECTION_CHANNEL, INIT_CHECK_CHANNEL, network } from '@quiet/state-manager'
import { initActions, InitCheckPayload, WebsocketConnectionPayload } from '../../init/init.slice'
import { ScreenNames } from '../../../const/ScreenNames.enum'
Expand All @@ -9,10 +9,18 @@ import { navigationActions } from '../../navigation/navigation.slice'
import { nativeServicesActions } from '../nativeServices.slice'

export function* nativeServicesCallbacksSaga(): Generator {
const channel = yield* call(deviceEvents)
while (true) {
const action = yield* take(channel)
yield put(action)
console.log('nativeServicesCallbacksSaga starting')
try {
const channel = yield* call(deviceEvents)
while (true) {
const action = yield* take(channel)
yield put(action)
}
} finally {
console.log('nativeServicesCallbacksSaga stopping')
if (yield cancelled()) {
console.log('nativeServicesCallbacksSaga cancelled')
}
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just adding additional logging around all of these long-running sagas (anything that runs until cancelled... so anything with a takeEvery in it or other while (true) loop)

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { select, call, takeLeading, putResolve } from 'typed-redux-saga'
import { select, call, putResolve } from 'typed-redux-saga'
import { app } from '@quiet/state-manager'
import { persistor } from '../../store'
import { nativeServicesActions } from '../nativeServices.slice'
Expand All @@ -9,11 +9,8 @@ import { ScreenNames } from '../../../../src/const/ScreenNames.enum'

export function* leaveCommunitySaga(): Generator {
console.log('Leaving community')

// Restart backend
yield* putResolve(app.actions.closeServices())

yield takeLeading(initActions.canceledRootTask.type, clearReduxStore)
}
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never want to run takeLeading or takeEvery more than once because, for example, in this case it will result in clearReduxStore being run multiple times. I moved this into the rootSaga instead.


export function* clearReduxStore(): Generator {
Expand Down
Loading
Loading