Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SocketException: Connection reset by peer (OS Error: Connection reset by peer) #55776

Open
leeflix opened this issue May 19, 2024 · 10 comments
Open
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-io

Comments

@leeflix
Copy link

leeflix commented May 19, 2024

I am using websockets in my dart application. My server occasionally crashes when a "peer resets the connection". I think its odd that an exception crashes the program. Is there any way to handle this?

@lrhn lrhn added the area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. label May 19, 2024
@mraleph
Copy link
Member

mraleph commented May 22, 2024

You would need to post a reproduction etc. Without code we could debug this is unactionable.

@mraleph mraleph closed this as completed May 22, 2024
@mraleph mraleph reopened this May 22, 2024
@mraleph mraleph added area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-io needs-info We need additional information from the issue author (auto-closed after 14 days if no response) and removed area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. labels May 22, 2024
@leeflix
Copy link
Author

leeflix commented May 22, 2024

@mraleph I am not doing anything fancy basically everything i do boils down to this:

    // start listening for ws connections there is no problem here
    
    SecurityContext? securityContext;
    var server;
    if (Env.certificateChainPath != null && Env.privateKeyPath != null) {
      securityContext = SecurityContext()
        ..usePrivateKeyBytes(File(Env.privateKeyPath!).readAsBytesSync())
        ..useCertificateChainBytes(File(Env.certificateChainPath!).readAsBytesSync());
      server = await HttpServer.bindSecure(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort, securityContext);
    } else {
      server = await HttpServer.bind(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort);
    }

    // the problem should be after here

    await for (var request in server) {
        if (WebSocketTransformer.isUpgradeRequest(request)) {
          WebSocket ws = await WebSocketTransformer.upgrade(request);
          ws.add(...); // send data
        }
    }

@leeflix
Copy link
Author

leeflix commented May 22, 2024

I don't know where the error exactly occurs:

WhatsApp Image 2024-05-22 at 17 03 32

@github-actions github-actions bot removed the needs-info We need additional information from the issue author (auto-closed after 14 days if no response) label May 23, 2024
@leeflix
Copy link
Author

leeflix commented Jun 6, 2024

can someone tell me a way to handle this exception?

@leeflix
Copy link
Author

leeflix commented Jun 19, 2024

please help @mraleph i just need someone to tell me how to handle exceptions which are thrown in the lifecycle of the websocket

@mraleph
Copy link
Member

mraleph commented Jun 20, 2024

@leeflix The challenge I am having here is that we need a way to reproduce this. Without reproduction it is hard to say what and where exactly is going wrong. I have tried some variations of trying to connect to WS server and then abruptly close connections - but I was not able to reproduce this sort of crash.

If this was a raw socket API, then I could just say that the code is using Socket.add and not catching errors from Socket.done / Socket.flush. But with HTTP/WebSockets these errors are handled internally.

So we really need a repro to be able to fix this.

@leeflix
Copy link
Author

leeflix commented Jun 26, 2024

@mraleph any ideas why it says "SocketException" and not "WebSocketException". does that not indicate that the error is not caught on the Socket API level? also any ideas where this string "Connection reset by peer" comes from?

@mraleph
Copy link
Member

mraleph commented Jun 26, 2024

@leeflix it's SocketException because it the low-level networking error, (web socket is just wrapping around lower level socket APIs). You get this sort of error if you have a client socket which you try to write into and the other side abruptly closes the connection. The string "Connection reset by peer" comes from the underlying OS networking stack.

@a-siva a-siva added the needs-info We need additional information from the issue author (auto-closed after 14 days if no response) label Jul 10, 2024
@leeflix
Copy link
Author

leeflix commented Jul 23, 2024

@mraleph so i still can't find the error. i added a lot of error handling so at least only the ws dies and not the whole application but still i don't get why the websockt closes just because a peer does something weird this should never happen by default. we cannot user live updates in out app if this is not fixed:

import 'dart:io';

import 'package:easywear_backend/controllers/read_controller.dart';
import 'package:easywear_backend/models/user.dart';
import 'package:easywear_backend/tokens/access_token.dart';
import 'package:easywear_backend/tokens/tokens.dart';
import 'package:easywear_backend/utils/env.dart';

class UserCreatedEvent extends Event {
  Map<String, dynamic> userJson;

  UserCreatedEvent(this.userJson);

  Map<String, dynamic> toJson() => {
        "type": "UserCreatedEvent",
        "user": userJson,
      };

  UserCreatedEvent.fromJson(Map<String, dynamic> json) : userJson = json["user"];
}

class RequestUpdatedEvent extends Event {
  Map<String, dynamic> requestJson;

  RequestUpdatedEvent(this.requestJson);

  Map<String, dynamic> toJson() => {
        "type": "RequestUpdatedEvent",
        "request": requestJson,
      };

  RequestUpdatedEvent.fromJson(Map<String, dynamic> json) : requestJson = json["request"];
}

class RequestCreatedEvent extends Event {
  Map<String, dynamic> requestJson;

  RequestCreatedEvent(this.requestJson);

  Map<String, dynamic> toJson() => {
        "type": "RequestCreatedEvent",
        "request": requestJson,
      };

  RequestCreatedEvent.fromJson(Map<String, dynamic> json) : requestJson = json["request"];
}

class CoinsOfUserUpdatedEvent extends Event {
  String userId;
  int coins;

  CoinsOfUserUpdatedEvent(this.userId, this.coins);

  Map<String, dynamic> toJson() => {
        "type": "CoinsOfUserUpdatedEvent",
        "userId": userId,
        "coins": coins,
      };

  CoinsOfUserUpdatedEvent.fromJson(Map<String, dynamic> json)
      : userId = json["userId"],
        coins = json["coins"];
}

class InitEvent extends Event {
  String accessToken;

  InitEvent(this.accessToken);

  Map<String, dynamic> toJson() => {
        "type": "InitEvent",
        "accessToken": accessToken,
      };

  InitEvent.fromJson(Map<String, dynamic> json) : accessToken = json["accessToken"];
}

abstract class Event {}

class EventChannel {
  static Map<String, List<WebSocket>> domainToAdminWebSockets = {};
  static Map<String, Map<String, List<WebSocket>>> domainToUserIdToWebSockets = {};

  static Future<void> sendCoinsOfUserUpdatedEvent(String domain, String userId, int coins) async {
    CoinsOfUserUpdatedEvent coinsOfUserUpdatedEvent = CoinsOfUserUpdatedEvent(userId, coins);
    await sendEventToAdmins(domain: domain, event: coinsOfUserUpdatedEvent);
    await sendEventToUser(domain: domain, userId: userId, event: coinsOfUserUpdatedEvent);
  }

  static Future<void> sendUserCreatedEvent(String domain, Map<String, dynamic> userJson) async {
    UserCreatedEvent userCreatedEvent = UserCreatedEvent(userJson);
    await sendEventToAdmins(domain: domain, event: userCreatedEvent);
    // await sendEventToUser(domain: domain, userId: requestJson["userId"], event: userCreatedEvent);
  }

  static Future<void> sendRequestUpdatedEvent(String domain, Map<String, dynamic> requestJson) async {
    RequestUpdatedEvent requestUpdatedEvent = RequestUpdatedEvent(requestJson);
    await sendEventToAdmins(domain: domain, event: requestUpdatedEvent);
    await sendEventToUser(domain: domain, userId: requestJson["userId"], event: requestUpdatedEvent);
  }

  static Future<void> sendRequestCreatedEvent({required String domain, required Map<String, dynamic> requestJson}) async {
    RequestCreatedEvent requestCreatedEvent = RequestCreatedEvent(requestJson);
    await sendEventToAdmins(domain: domain, event: requestCreatedEvent);
    await sendEventToUser(domain: domain, userId: requestJson["userId"], event: requestCreatedEvent);
  }

  static Future<void> sendEventToAdmins({required String domain, required Event event}) async {
    try {
      List<WebSocket> adminWebSockets = domainToAdminWebSockets[domain] ?? [];
      for (int i = 0; i < adminWebSockets.length; i++) {
        if (adminWebSockets[i].closeCode != null) {
          adminWebSockets.removeAt(i);
        } else {
          adminWebSockets[i].add(jsonEncode(event));
        }
      }
    } catch (err) {
      print("Caught error during sending event to admins continuing ... 44: ${err}");
    }
  }

  static Future<void> sendEventToUser({required String domain, required String userId, required Event event}) async {
    try {
      List<WebSocket> userWebSockets = domainToUserIdToWebSockets[domain]?[userId] ?? [];
      for (int i = 0; i < userWebSockets.length; i++) {
        if (userWebSockets[i].closeCode != null) {
          userWebSockets.removeAt(i);
        } else {
          userWebSockets[i].add(jsonEncode(event));
        }
      }
    } catch (err) {
      print("Caught error during sending event to user continuing ... 43: ${err}");
    }
  }

  static Future<void> init() async {
    SecurityContext? securityContext;
    var server;
    if (Env.certificateChainPath != null && Env.privateKeyPath != null) {
      securityContext = SecurityContext()
        ..usePrivateKeyBytes(File(Env.privateKeyPath!).readAsBytesSync())
        ..useCertificateChainBytes(File(Env.certificateChainPath!).readAsBytesSync());
      server = await HttpServer.bindSecure(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort, securityContext);
      print("Server running");
    } else {
      server = await HttpServer.bind(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort);
      print("Server running");
    }


    try {
      // Listen for incoming connections
      for (var request in server) {
        // Check if the request is a WebSocket upgrade request
        try {
          if (WebSocketTransformer.isUpgradeRequest(request)) {
            // Upgrade the connection to a WebSocket connection
            WebSocket ws = await WebSocketTransformer.upgrade(request);
            ws.done.catchError((error) => print('On Error1: $error'));

            InitEvent initEvent = InitEvent.fromJson(jsonDecode(await ws.handleError((error) => print('On Error2: $error')).first));

            AccessToken decodedAccessToken = Tokens.decodeAccessToken(initEvent.accessToken);

            User user = (await ReadController.read<User>(domain: decodedAccessToken.domain, id: decodedAccessToken.userId))!;

            if (user.isAdmin) {
              if (domainToAdminWebSockets[decodedAccessToken.domain] == null) {
                domainToAdminWebSockets[decodedAccessToken.domain] = [ws];
              } else {
                domainToAdminWebSockets[decodedAccessToken.domain]!.add(ws);
              }
            } else {
              if (domainToUserIdToWebSockets[decodedAccessToken.domain] == null) {
                domainToUserIdToWebSockets[decodedAccessToken.domain] = {
                  decodedAccessToken.userId: [ws]
                };
              } else {
                if (domainToUserIdToWebSockets[decodedAccessToken.domain]![decodedAccessToken.userId] == null) {
                  domainToUserIdToWebSockets[decodedAccessToken.domain]![decodedAccessToken.userId] = [ws];
                } else {
                  domainToUserIdToWebSockets[decodedAccessToken.domain]![decodedAccessToken.userId]!.add(ws);
                }
              }
            }

          }
        } catch (err) {
          print("Caught WebSocket Error continuing ... 42: ${err}");
        }

      }
    } catch (err) {
        print("Error3: ${err}");
    }
  }
}

@github-actions github-actions bot removed the needs-info We need additional information from the issue author (auto-closed after 14 days if no response) label Jul 23, 2024
@leeflix
Copy link
Author

leeflix commented Aug 2, 2024

switched to sse

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-io
Projects
None yet
Development

No branches or pull requests

4 participants