Skip to content

Commit

Permalink
Fixes #7 Binding to channel events cancels calls to onConnectionState…
Browse files Browse the repository at this point in the history
…Change

Added Stream Handler to allow multiple classes to listen to event channel stream
  • Loading branch information
chinloyal committed Jan 25, 2021
1 parent 84201d2 commit 619a29e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
1 change: 1 addition & 0 deletions example/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ app.*.symbols

# Obfuscation related
app.*.map.json
testdata.txt
32 changes: 32 additions & 0 deletions lib/src/contracts/stream_handler.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import 'dart:async';

import 'package:flutter/services.dart';

abstract class StreamHandler {
static const EventChannel _eventStream =
const EventChannel('com.github.chinloyal/pusher_client_stream');
StreamSubscription _eventStreamSubscription;

static Map<String, dynamic Function(dynamic)> _listeners = {};

/// Add a listener to the event channel stream for pusher,
/// any class that extends [StreamHandler] should use this method.
void registerListener(String classId, dynamic Function(dynamic) method) {
StreamHandler._listeners[classId] = method;

_eventStreamSubscription =
_eventStream.receiveBroadcastStream().listen(_eventHandler);
}

/// This method will close the entire event channel stream
/// which is why it should only be used by [PusherClient]
void cancelEventChannelStream() {
_eventStreamSubscription.cancel();
}

void _eventHandler(event) {
_listeners.values.forEach((method) {
method(event);
});
}
}
14 changes: 4 additions & 10 deletions lib/src/pusher/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ import 'dart:async';
import 'dart:convert';

import 'package:flutter/services.dart';
import 'package:pusher_client/src/contracts/stream_handler.dart';
import 'package:pusher_client/src/models/event_stream_result.dart';
import 'package:pusher_client/src/pusher/pusher_event.dart';

class Channel {
class Channel extends StreamHandler {
static const MethodChannel _mChannel =
const MethodChannel('com.github.chinloyal/pusher_client');
static const EventChannel _eventStream =
const EventChannel('com.github.chinloyal/pusher_client_stream');
static const classId = 'Channel';

static Map<String, void Function(PusherEvent)> _eventCallbacks =
Map<String, void Function(PusherEvent)>();
StreamSubscription _eventStreamSubscription;

final String name;

Expand All @@ -34,8 +33,7 @@ class Channel {
String eventName,
void Function(PusherEvent event) onEvent,
) async {
_eventStreamSubscription =
_eventStream.receiveBroadcastStream().listen(_eventHandler);
registerListener(classId, _eventHandler);
_eventCallbacks[this.name + eventName] = onEvent;

await _mChannel.invokeMethod('bind', {
Expand All @@ -49,10 +47,6 @@ class Channel {
Future<void> unbind(String eventName) async {
_eventCallbacks.remove(this.name + eventName);

if (_eventCallbacks.isEmpty) {
_eventStreamSubscription.cancel();
}

await _mChannel.invokeMethod('unbind', {
'channelName': this.name,
'eventName': eventName,
Expand Down
12 changes: 5 additions & 7 deletions lib/src/pusher/pusher_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:convert';

import 'package:flutter/services.dart';
import 'package:pusher_client/pusher_client.dart';
import 'package:pusher_client/src/contracts/stream_handler.dart';
import 'package:pusher_client/src/models/connection_error.dart';
import 'package:pusher_client/src/models/connection_state_change.dart';
import 'package:pusher_client/src/models/event_stream_result.dart';
Expand All @@ -15,16 +16,14 @@ part 'pusher_client.g.dart';
/// created automatically unless [autoConnect] is set to false,
/// if auto connect is disabled this means you can call
/// `connect()` at a later point.
class PusherClient {
class PusherClient extends StreamHandler {
static const MethodChannel _channel =
const MethodChannel('com.github.chinloyal/pusher_client');
static const EventChannel _eventStream =
const EventChannel('com.github.chinloyal/pusher_client_stream');
static const classId = 'PusherClient';

static PusherClient _singleton;
void Function(ConnectionStateChange) _onConnectionStateChange;
void Function(ConnectionError) _onConnectionError;
StreamSubscription _eventStreamSubscription;
String _socketId;

PusherClient._(
Expand Down Expand Up @@ -58,8 +57,7 @@ class PusherClient {
}

Future _init(String appKey, PusherOptions options, InitArgs initArgs) async {
_singleton._eventStreamSubscription =
_eventStream.receiveBroadcastStream().listen(_eventHandler);
registerListener(classId, _eventHandler);
await _channel.invokeMethod(
'init',
jsonEncode({
Expand Down Expand Up @@ -98,7 +96,7 @@ class PusherClient {
Future disconnect() async {
await _channel.invokeMethod('disconnect');

_eventStreamSubscription.cancel();
cancelEventChannelStream();
}

/// The id of the current connection
Expand Down

0 comments on commit 619a29e

Please sign in to comment.