Skip to content

Commit

Permalink
shutdown fixes, error event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andremussche committed Jun 25, 2014
1 parent 491206d commit b85079a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 21 deletions.
61 changes: 49 additions & 12 deletions IdHTTPWebsocketClient.pas
Expand Up @@ -184,6 +184,9 @@ implementation
IdCoderMIME, SysUtils, Math, IdException, IdStackConsts, IdStack,
IdStackBSDBase, IdGlobal, Windows, StrUtils, DateUtils;

var
GUnitFinalized: Boolean = false;

//type
// TAnonymousThread = class(TThread)
// protected
Expand Down Expand Up @@ -1148,6 +1151,8 @@ procedure TIdWebsocketMultiReadThread.AddClient(
var l: TList;
begin
//Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket');
if Self = nil then Exit;
if Self.Terminated then Exit;

l := FChannels.LockList;
try
Expand Down Expand Up @@ -1212,6 +1217,13 @@ procedure TIdWebsocketMultiReadThread.BreakSelectWait;

destructor TIdWebsocketMultiReadThread.Destroy;
begin
if FReconnectThread <> nil then
begin
FReconnectThread.Terminate;
FReconnectThread.WaitFor;
FReconnectThread.Free;
end;

IdWinsock2.closesocket(FTempHandle);
FTempHandle := 0;
FChannels.Free;
Expand Down Expand Up @@ -1256,6 +1268,8 @@ class function TIdWebsocketMultiReadThread.Instance: TIdWebsocketMultiReadThread
begin
if (FInstance = nil) then
begin
if GUnitFinalized then Exit(nil);

FInstance := TIdWebsocketMultiReadThread.Create(True);
FInstance.Start;
end;
Expand All @@ -1269,6 +1283,8 @@ procedure TIdWebsocketMultiReadThread.PingAllChannels;
ws: TIdIOHandlerWebsocket;
i: Integer;
begin
if Terminated then Exit;

l := FChannels.LockList;
try
for i := 0 to l.Count - 1 do
Expand Down Expand Up @@ -1311,8 +1327,11 @@ procedure TIdWebsocketMultiReadThread.PingAllChannels;
FChannels.UnlockList;
end;

if Terminated then Exit;

//reconnect needed? (in background)
if (FReconnectlist <> nil) and (FReconnectlist.Count > 0) then
if FReconnectlist <> nil then
if FReconnectlist.Count > 0 then
begin
if FReconnectThread = nil then
FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start});
Expand Down Expand Up @@ -1456,7 +1475,8 @@ procedure TIdWebsocketMultiReadThread.ReadFromAllChannels;
//some data?
if (iResult > 0) then
begin
//strmEvent := nil;
//make sure the thread is created outside a lock
TIdWebsocketDispatchThread.Instance;

l := FChannels.LockList;
if l = nil then Exit;
Expand Down Expand Up @@ -1503,6 +1523,7 @@ procedure TIdWebsocketMultiReadThread.RemoveClient(
aChannel: TIdHTTPWebsocketClient);
begin
if Self = nil then Exit;
if Self.Terminated then Exit;

aChannel.Lock;
try
Expand All @@ -1516,18 +1537,23 @@ procedure TIdWebsocketMultiReadThread.RemoveClient(
end;

class procedure TIdWebsocketMultiReadThread.RemoveInstance(aForced: boolean);
var
o: TIdWebsocketMultiReadThread;
begin
if FInstance <> nil then
begin
FInstance.Terminate;
o := FInstance;
FInstance := nil;

if aForced then
begin
WaitForSingleObject(FInstance.Handle, 2 * 1000);
TerminateThread(FInstance.Handle, MaxInt);
WaitForSingleObject(o.Handle, 2 * 1000);
TerminateThread(o.Handle, MaxInt);
end
else
FInstance.WaitFor;
FreeAndNil(FInstance);
o.WaitFor;
FreeAndNil(o);
end;
end;

Expand All @@ -1544,6 +1570,8 @@ procedure TIdWebsocketMultiReadThread.ResetSpecialEventSocket;
procedure TIdWebsocketMultiReadThread.Terminate;
begin
inherited Terminate;
if FReconnectThread <> nil then
FReconnectThread.Terminate;

FChannels.LockList;
try
Expand All @@ -1560,6 +1588,8 @@ class function TIdWebsocketDispatchThread.Instance: TIdWebsocketDispatchThread;
begin
if FInstance = nil then
begin
if GUnitFinalized then Exit(nil);

GlobalNameSpace.BeginWrite;
try
if FInstance = nil then
Expand All @@ -1575,17 +1605,22 @@ class function TIdWebsocketDispatchThread.Instance: TIdWebsocketDispatchThread;
end;

class procedure TIdWebsocketDispatchThread.RemoveInstance;
var
o: TIdWebsocketDispatchThread;
begin
if FInstance <> nil then
begin
FInstance.Terminate;
o := FInstance;
FInstance := nil;

if aForced then
begin
WaitForSingleObject(FInstance.Handle, 2 * 1000);
TerminateThread(FInstance.Handle, MaxInt);
WaitForSingleObject(o.Handle, 2 * 1000);
TerminateThread(o.Handle, MaxInt);
end;
FInstance.WaitFor;
FreeAndNil(FInstance);
o.WaitFor;
FreeAndNil(o);
end;
end;

Expand All @@ -1604,7 +1639,9 @@ function TWSThreadList.Count: Integer;

initialization
finalization
GUnitFinalized := True;
if TIdWebsocketMultiReadThread.Instance <> nil then
TIdWebsocketMultiReadThread.Instance.Terminate;
TIdWebsocketDispatchThread.RemoveInstance();
TIdWebsocketMultiReadThread.RemoveInstance();
TIdWebsocketDispatchThread.RemoveInstance()

end.
22 changes: 13 additions & 9 deletions IdSocketIOHandling.pas
Expand Up @@ -6,7 +6,7 @@ interface
Classes, Generics.Collections,
superobject,
IdServerBaseHandling, IdContext, IdException, IdIOHandlerWebsocket, IdHTTP,
SyncObjs;
SyncObjs, SysUtils;

type
TSocketIOContext = class;
Expand All @@ -22,6 +22,7 @@ TIdSocketIOHandling = class;
TSocketIONotify = reference to procedure(const ASocket: ISocketIOContext);
TSocketIOEvent = reference to procedure(const ASocket: ISocketIOContext; const aArgument: TSuperArray; const aCallback: ISocketIOCallback);
TSocketIOError = reference to procedure(const ASocket: ISocketIOContext; const aErrorClass, aErrorMessage: string);
TSocketIOEventError = reference to procedure(const ASocket: ISocketIOContext; const aCallback: ISocketIOCallback; E: Exception);

TSocketIONotifyList = class(TList<TSocketIONotify>);
TSocketIOEventList = class(TList<TSocketIOEvent>);
Expand Down Expand Up @@ -139,6 +140,8 @@ TIdBaseSocketIOHandling = class(TIdServerBaseHandling)
FOnSocketIOJson: TSocketIOMsgJSON;

procedure ProcessEvent(const AContext: TSocketIOContext; const aText: string; aMsgNr: Integer; aHasCallback: Boolean);
private
FOnEventError: TSocketIOEventError;
protected
type
TSocketIOCallback = procedure(const aData: string) of object;
Expand Down Expand Up @@ -194,6 +197,7 @@ TIdBaseSocketIOHandling = class(TIdServerBaseHandling)
procedure OnEvent (const aEventName: string; const aCallback: TSocketIOEvent);
procedure OnConnection(const aCallback: TSocketIONotify);
procedure OnDisconnect(const aCallback: TSocketIONotify);
property OnEventError: TSocketIOEventError read FOnEventError write FOnEventError;

procedure EnumerateSockets(const aEachSocketCallback: TSocketIONotify);
end;
Expand All @@ -208,7 +212,7 @@ TIdSocketIOHandling = class(TIdBaseSocketIOHandling)
implementation

uses
SysUtils, StrUtils, IdServerWebsocketContext, IdHTTPWebsocketClient, Windows;
StrUtils, IdServerWebsocketContext, IdHTTPWebsocketClient, Windows;

procedure TIdBaseSocketIOHandling.AfterConstruction;
begin
Expand Down Expand Up @@ -501,15 +505,15 @@ procedure TIdBaseSocketIOHandling.ProcessEvent(
else
callback := nil;
try
for event in list do
try
for event in list do
event(AContext, args, callback);
except
on E:Exception do
begin
event(AContext, args, callback);
except on E:Exception do
if Assigned(OnEventError) then
OnEventError(AContext, callback, e)
else
if callback <> nil then
callback.SendResponse( SO(['Error', e.Message]).AsJSon );
end;
callback.SendResponse( SO(['Error', SO(['msg', e.message])]).AsJSon );
end;
finally
callback := nil;
Expand Down
3 changes: 3 additions & 0 deletions uROIndyHTTPWebsocketChannel.pas
Expand Up @@ -116,18 +116,21 @@ procedure TROIndyHTTPWebsocketChannel.SetHost(const Value: string);
begin
IndyClient.Host := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
FTriedUpgrade := False; //reset
end;

procedure TROIndyHTTPWebsocketChannel.SetPort(const Value: integer);
begin
IndyClient.Port := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
FTriedUpgrade := False; //reset
end;

procedure TROIndyHTTPWebsocketChannel.SetWSResourceName(const Value: string);
begin
IndyClient.WSResourceName := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
FTriedUpgrade := False; //reset
end;

function TROIndyHTTPWebsocketChannel.GetHost: string;
Expand Down

0 comments on commit b85079a

Please sign in to comment.