Skip to content

Streams recreate themselves after ending #24

@tegefaulkes

Description

@tegefaulkes

Specification

After a stream has ended it seems to re-create itself with the same streamId and get's emitted as a new stream event. This is after creating a single stream and ending it gracefully.

Here is a reproducible example.

import * as testsUtils from "./utils";
import { promise } from "@/utils";
import * as events from "@/events";
import WebSocketServer from "@/WebSocketServer";
import WebSocketClient from "@/WebSocketClient";
import Logger, { formatting, LogLevel, StreamHandler } from "@matrixai/logger";
import { KeyTypes, sleep } from "./utils";
import { WebSocketConnection, WebSocketStream } from "@";


test('quick test', async () => {
  const logger = new Logger(`${WebSocketClient.name} Test`, LogLevel.INFO, [
    new StreamHandler(
      formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`,
    ),
  ]);
  const localhost = '127.0.0.1';
  const types: Array<KeyTypes> = ['RSA', 'ECDSA', 'ED25519'];
  // Const types: Array<KeyTypes> = ['RSA'];
  const defaultType = types[0];

  const tlsConfigServer = await testsUtils.generateConfig(defaultType);

  const connectionEventProm =
    promise<WebSocketConnection>();
  const server = new WebSocketServer({
    logger: logger.getChild(WebSocketServer.name),
    config: {
      key: tlsConfigServer.key,
      cert: tlsConfigServer.cert,
      verifyPeer: false,
    },
  });
  server.addEventListener(
    events.EventWebSocketServerConnection.name,
    (e: events.EventWebSocketServerConnection) =>
      connectionEventProm.resolveP(e.detail),
  );
  await server.start({
    host: localhost,
  });
  // If the server is slow to respond then this will time out.
  //  Then main cause of this was the server not processing the initial packet
  //  that creates the `WebSocketConnection`, as a result, the whole creation waited
  //  an extra 1 second for the client to retry the initial packet.
  const client = await WebSocketClient.createWebSocketClient(
    {
      host: localhost,
      port: server.port,
      logger: logger.getChild(WebSocketClient.name),
      config: {
        verifyPeer: false,
      },
    },
    { timer: 500 },
  );
  client.connection.addEventListener(
    events.EventWebSocketConnectionStream.name,
    (evt: events.EventWebSocketConnectionStream) => {
      logger.warn('stream emitted by CLIENT connection');
    },
  );
  const serverConnection = await connectionEventProm.p;
  const prom = promise<WebSocketStream>();
  serverConnection.addEventListener(
    events.EventWebSocketConnectionStream.name,
    (evt: events.EventWebSocketConnectionStream) => {
      logger.warn('stream emitted by SERVER connection');
      prom.resolveP(evt.detail)
    },
    );
  console.log('starting stream');
  const streamLocal = await client.connection.newStream();
  const streamPeer = await prom.p;
  const message = Buffer.from('message!');
  console.log('asd');
  const writerLocal = streamLocal.writable.getWriter()
  await writerLocal.write(message);
  await writerLocal.close();
  for await (const message of streamPeer.readable) {
    console.log(Buffer.from(message).toString());
  }
  const writerPeer = streamPeer.writable.getWriter()
  await writerPeer.write(message);
  await writerPeer.close();
  for await (const message of streamLocal.readable) {
    console.log(Buffer.from(message).toString());
  }
  console.log('waiting');
  await sleep(2000);
  console.log('waitied!')

  await client.destroy({ force: true });
  await server.stop({ force: true });
})

I have already applied a quick fix to this in the WebSocketConnection with the following.

    let stream = this.streamMap.get(streamId);
    if (stream == null) {
      // FIXME: tempfix for preventing dead stream re-creation
      if (this.usedIdSet.has(streamId)) {
        // Silently ignore the message
        return;
      } else {
        this.usedIdSet.add(streamId);
      }
// ...
// ...

But this is far from ideal, The whole problem needs to be investigated and have a proper fix applied for it.

Additional context

Tasks

  1. Remove temp fix.
  2. investigate the problem.
  3. Apply a proper fix.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions