diff --git a/core/src/plugins/core.mq/class.MqManager.php b/core/src/plugins/core.mq/class.MqManager.php index 6ccd79c42d..a85d4986e5 100755 --- a/core/src/plugins/core.mq/class.MqManager.php +++ b/core/src/plugins/core.mq/class.MqManager.php @@ -31,6 +31,8 @@ use Pydio\Core\PluginFramework\Plugin; use Pydio\Core\Utils\UnixProcess; +use GuzzleHttp\Command\Guzzle\GuzzleClient; + defined('AJXP_EXEC') or die( 'Access not allowed'); // DL and install install vendor (composer?) https://github.com/Devristo/phpws @@ -176,39 +178,22 @@ public function sendInstantMessage($xmlContent, $repositoryId, $targetUserId = n $this->msgExchanger->publishInstantMessage("nodes:$repositoryId", $message); } - /* - // Publish on NSQ - require_once(AJXP_INSTALL_PATH."core/classes/vendor/autoload.php"); - $nsq = new nsqphp\nsqphp; - $nsq->publishTo("localhost", 1); - $nsq->publish('pydio', new nsqphp\Message\Message(json_encode(array('msg' => 'im', 'run' => $message)))); - */ + // Publish to NSQ for WebSockets + $client = new GuzzleHttp\Client(); - // Publish for WebSockets - $configs = $this->getConfigs(); - if ($configs["WS_SERVER_ACTIVE"]) { - - require_once($this->getBaseDir()."/vendor/phpws/websocket.client.php"); - // Publish for websockets - $input = array("REPO_ID" => $repositoryId, "CONTENT" => "".$xmlContent.""); - if(isSet($userId)){ - $input["USER_ID"] = $userId; - } else if(isSet($gPath)) { - $input["GROUP_PATH"] = $gPath; - } - if(count($nodePathes)) { - $input["NODE_PATHES"] = $nodePathes; - } - $input = serialize($input); - $msg = WebSocketMessage::create($input); - if (!isset($this->wsClient)) { - $this->wsClient = new WebSocket("ws://".$configs["WS_SERVER_BIND_HOST"].":".$configs["WS_SERVER_BIND_PORT"].$configs["WS_SERVER_PATH"]); - $this->wsClient->addHeader("Admin-Key", $configs["WS_SERVER_ADMIN"]); - @$this->wsClient->open(); - } - @$this->wsClient->sendMessage($msg); + // Publish for websockets + $input = array("REPO_ID" => $repositoryId, "CONTENT" => "".$xmlContent.""); + if(isSet($userId)){ + $input["USER_ID"] = $userId; + } else if(isSet($gPath)) { + $input["GROUP_PATH"] = $gPath; + } + if(count($nodePathes)) { + $input["NODE_PATHES"] = $nodePathes; } + $client->post('http://127.0.0.1:4151/put?topic=im', ['json' => $input, 'future' => true]); + $this->hasPendingMessage = true; }