Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Commit

Permalink
Adapt Tasks & MQ for polling case (no websocket).
Browse files Browse the repository at this point in the history
  • Loading branch information
cdujeu committed May 23, 2016
1 parent 90d42b9 commit 928bb4e
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 22 deletions.
50 changes: 50 additions & 0 deletions core/src/core/src/pydio/Core/Http/Message/XMLMessage.php
@@ -0,0 +1,50 @@
<?php
/*
* Copyright 2007-2015 Abstrium <contact (at) pydio.com>
* This file is part of Pydio.
*
* Pydio is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Pydio is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Pydio. If not, see <http://www.gnu.org/licenses/>.
*
* The latest code can be found at <http://pyd.io/>.
*/
namespace Pydio\Core\Http\Message;

use Pydio\Core\Http\Response\XMLSerializableResponseChunk;

defined('AJXP_EXEC') or die('Access not allowed');

class XMLMessage implements XMLSerializableResponseChunk
{
/**
* @var string
*/
private $inner;

/**
* XMLMessage constructor.
* @param string $xmlString
*/
public function __construct($xmlString)
{
$this->inner = $xmlString;
}

/**
* @return string
*/
public function toXML()
{
return $this->inner;
}
}
41 changes: 35 additions & 6 deletions core/src/plugins/core.mq/class.AjxpMqObserver.js
Expand Up @@ -36,7 +36,9 @@ Class.create("AjxpMqObserver", {
if(window.ajxpMinisite) return;

this.clientId = window.ajxpBootstrap.parameters.get("SECURE_TOKEN");
this.configs = ajaxplorer.getPluginConfigs("mq");
this.configs = pydio.getPluginConfigs("mq");
this.defaultPollerFreq = this.configs.get('POLLER_FREQUENCY') || 15;
this.pollingFrequency = this.defaultPollerFreq;

document.observe("ajaxplorer:repository_list_refreshed", function(event){

Expand All @@ -54,8 +56,8 @@ Class.create("AjxpMqObserver", {

}.bind(this));

if(ajaxplorer.repositoryId){
this.initForRepoId(ajaxplorer.repositoryId);
if(pydio.repositoryId){
this.initForRepoId(pydio.repositoryId);
}

},
Expand Down Expand Up @@ -85,7 +87,7 @@ Class.create("AjxpMqObserver", {
var obj = parseXml(event.data);
if(obj){
PydioApi.getClient().parseXmlMessage(obj);
ajaxplorer.notify("server_message", obj);
pydio.notify("server_message", obj);
}
};
this.ws.onopen = function(){
Expand Down Expand Up @@ -183,6 +185,15 @@ Class.create("AjxpMqObserver", {
}.bind(this);
conn.sendAsync();

if(this._consumeTriggerObs) {
pydio.stopObserving("response.xml", this._consumeTriggerObs);
this._consumeTriggerObs = null;
}
if(this._pollingFreqObs){
pydio.stopObserving("poller.frequency", this._pollingFreqObs);
this._pollingFreqObs = null;
}

},

registerChannel : function(repoId){
Expand All @@ -197,14 +208,32 @@ Class.create("AjxpMqObserver", {
conn.discrete = true;
conn.sendAsync();

this.pe = new PeriodicalExecuter(this.consumeChannel.bind(this), this.configs.get('POLLER_FREQUENCY') || 5);

this.pe = new PeriodicalExecuter(this.consumeChannel.bind(this), this.pollingFrequency);

this._consumeTriggerObs = function(responseXML){
if(XMLUtils.XPathSelectSingleNode(responseXML, "//consume_channel")){
this.consumeChannel();
}
}.bind(this);
pydio.observe("response.xml", this._consumeTriggerObs);

this._pollingFreqObs = function(freq){
var value = freq.value ? freq.value : this.defaultPollerFreq;
if(value == this.pollingFrequency) return;
this.pollingFrequency = value;
this.pe.stop();
this.pe = new PeriodicalExecuter(this.consumeChannel.bind(this), value);
}.bind(this);
pydio.observe("poller.frequency", this._pollingFreqObs);

},

consumeChannel : function(){
if(this.channel_pending) {
return;
}
pydio.notify("poller.event");
var conn = new Connexion();
conn.setParameters($H({
get_action:'client_consume_channel',
Expand All @@ -216,7 +245,7 @@ Class.create("AjxpMqObserver", {
this.channel_pending = false;
if(transport.responseXML){
PydioApi.getClient().parseXmlMessage(transport.responseXML);
ajaxplorer.notify("server_message", transport.responseXML);
pydio.notify("server_message", transport.responseXML);
}
}.bind(this);
this.channel_pending = true;
Expand Down
30 changes: 15 additions & 15 deletions core/src/plugins/core.mq/class.MqManager.php
Expand Up @@ -20,6 +20,7 @@
*/

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Pydio\Access\Core\Model\AJXP_Node;
use Pydio\Access\Core\Filter\AJXP_Permission;
use Pydio\Core\Controller\Controller;
Expand Down Expand Up @@ -217,21 +218,25 @@ public function appendRefreshInstruction(ResponseInterface &$responseInterface){
return;
}
$respType = &$responseInterface->getBody();
if(!$respType instanceof \Pydio\Core\Http\Response\SerializableResponseStream && !$respType->getSize()){
$respType = new \Pydio\Core\Http\Response\SerializableResponseStream();
$responseInterface = $responseInterface->withBody($respType);
}
if($respType instanceof \Pydio\Core\Http\Response\SerializableResponseStream){
require_once("ConsumeChannelMessage.php");
$respType->addChunk(new ConsumeChannelMessage());
}
}

/**
* @param $action
* @param $httpVars
* @param $fileVars
*
* @param $request
* @param $response
*/
public function clientChannelMethod($action, $httpVars, $fileVars)
public function clientChannelMethod(ServerRequestInterface $request, ResponseInterface &$response)
{
if(!$this->msgExchanger) return;
$action = $request->getAttribute("action");
$httpVars = $request->getParsedBody();
switch ($action) {
case "client_register_channel":
$this->msgExchanger->suscribeToChannel($httpVars["channel"], $httpVars["client_id"]);
Expand All @@ -243,10 +248,7 @@ public function clientChannelMethod($action, $httpVars, $fileVars)
if (AuthService::usersEnabled()) {
$user = AuthService::getLoggedUser();
if ($user == null) {
XMLWriter::header();
XMLWriter::requireAuth();
XMLWriter::close();
return;
throw new \Pydio\Core\Exception\AuthRequiredException();
}
$GROUP_PATH = $user->getGroupPath();
if($GROUP_PATH == null) $GROUP_PATH = false;
Expand All @@ -266,16 +268,15 @@ public function clientChannelMethod($action, $httpVars, $fileVars)
$regexp = '/'.implode("|", $regexps).'/';
}
$channelRepository = str_replace("nodes:", "", $httpVars["channel"]);
$serialBody = new \Pydio\Core\Http\Response\SerializableResponseStream();
$response = $response->withBody($serialBody);
if($channelRepository != $currentRepository){
XMLWriter::header();
echo "<require_registry_reload repositoryId=\"$currentRepository\"/>";
XMLWriter::close();
$serialBody->addChunk(new \Pydio\Core\Http\Message\XMLMessage("<require_registry_reload repositoryId=\"$currentRepository\"/>"));
return;
}

$data = $this->msgExchanger->consumeInstantChannel($httpVars["channel"], $httpVars["client_id"], $uId, $GROUP_PATH);
if (count($data)) {
XMLWriter::header();
ksort($data);
foreach ($data as $messageObject) {
if(isSet($regexp) && isSet($messageObject->nodePathes)){
Expand All @@ -288,9 +289,8 @@ public function clientChannelMethod($action, $httpVars, $fileVars)
}
if(!$pathIncluded) continue;
}
echo $messageObject->content;
$serialBody->addChunk(new \Pydio\Core\Http\Message\XMLMessage($messageObject->content));
}
XMLWriter::close();
}

break;
Expand Down
25 changes: 25 additions & 0 deletions core/src/plugins/core.tasks/js/react/PydioTasks.js
Expand Up @@ -117,6 +117,26 @@
let taskObject = new Task(t);
this._tasksList.set(t.id, taskObject);
this.notify("tasks_updated", taskObject);
global.pydio.notify("poller.frequency", {value:2});
}
}
var taskList = XMLUtils.XPathSelectSingleNode(xml, 'tree/taskList');
if(taskList){
let jsonData = taskList.firstChild.nodeValue; // CDATA
let tasks = JSON.parse(jsonData);
if(tasks instanceof Object){
let taskMap = new Map();
tasks.map(function(t){
let task = new Task(t);
taskMap.set(task.getId(), task);
});
this._tasksList = taskMap;
this.notify("tasks_updated");
if(tasks.length){
global.pydio.notify("poller.frequency", {value:2});
}else{
global.pydio.notify("poller.frequency", {});
}
}
}
}.bind(this));
Expand Down Expand Up @@ -150,6 +170,11 @@
tasks.map(function(t){taskMap.set(t.getId(), t)});
this._tasksList = taskMap;
this.notify("tasks_updated");
if(tasks.length){
global.pydio.notify("poller.frequency", {value:2});
}else{
global.pydio.notify("poller.frequency", {});
}
}.bind(this));
}
return this._tasksList;
Expand Down
5 changes: 5 additions & 0 deletions core/src/plugins/core.tasks/manifest.xml
Expand Up @@ -39,6 +39,11 @@
<serverCallback methodName="route"/>
</processing>
</action>
<action name="client_consume_channel">
<post_processing>
<serverCallback methodName="enrichConsumeChannel"/>
</post_processing>
</action>
</actions>
<hooks>
<serverCallback methodName="attachTasksToNode" hookName="node.info"/>
Expand Down
6 changes: 6 additions & 0 deletions core/src/plugins/core.tasks/src/ITasksProvider.php
Expand Up @@ -60,6 +60,12 @@ public function deleteTask($taskId);
*/
public function getPendingTasks();

/**
* @param AbstractAjxpUser $user
* @param Repository $repository
* @return Task[]
*/
public function getCurrentRunningTasks($user, $repository);

/**
* @param AJXP_Node $node
Expand Down
12 changes: 12 additions & 0 deletions core/src/plugins/core.tasks/src/Providers/MockTasksProvider.php
Expand Up @@ -22,6 +22,8 @@
namespace Pydio\Tasks\Providers;

use Pydio\Access\Core\Model\AJXP_Node;
use Pydio\Access\Core\Model\Repository;
use Pydio\Conf\Core\AbstractAjxpUser;
use Pydio\Tasks\Task;
use Pydio\Tasks\Schedule;

Expand Down Expand Up @@ -109,4 +111,14 @@ public function getTasks($user = null, $repository = null, $status = -1)
$t1->setId("fake-task-id");
return [$t1];
}

/**
* @param AbstractAjxpUser $user
* @param Repository $repository
* @return Task[]
*/
public function getCurrentRunningTasks($user, $repository)
{
// TODO: Implement getCurrentRunningTasks() method.
}
}
20 changes: 20 additions & 0 deletions core/src/plugins/core.tasks/src/Providers/SqlTasksProvider.php
Expand Up @@ -23,6 +23,7 @@
use Pydio\Access\Core\Model\AJXP_Node;
use Pydio\Access\Core\Model\Repository;
use Pydio\Conf\Core\AbstractAjxpUser;
use Pydio\Core\Services\AuthService;
use Pydio\Tasks\ITasksProvider;
use Pydio\Tasks\Schedule;
use Pydio\Tasks\Task;
Expand Down Expand Up @@ -129,6 +130,25 @@ public function getPendingTasks()
return $this->getTasks(null, null, Task::STATUS_PENDING);
}

/**
* @param AbstractAjxpUser $user
* @param Repository $repository
* @return Task[]
*/
public function getCurrentRunningTasks($user, $repository)
{
$tasks = [];
$where = [];
$where[] = array("[userId] = %s", $user->getId());
$where[] = array("[wsId] = %s", $repository->getId());
$where[] = array("[status] IN (1,2,8,16)");
$res = \dibi::query('SELECT * FROM [ajxp_tasks] WHERE %and', $where);
foreach ($res->fetchAll() as $row) {
$tasks[] = $this->taskFromDBValues($row);
}
return $tasks;
}

/**
* @param AJXP_Node $node
* @param $active
Expand Down
17 changes: 16 additions & 1 deletion core/src/plugins/core.tasks/src/TaskController.php
Expand Up @@ -68,7 +68,7 @@ public function route(ServerRequestInterface &$request, ResponseInterface &$resp
$taskService = TaskService::getInstance();
switch ($action){
case "tasks_list":
$tasks = $taskService->getPendingTasks();
$tasks = $taskService->getCurrentRunningTasks(AuthService::getLoggedUser(), ConfService::getRepository());
$response = new JsonResponse($tasks);
break;
case "task_info":
Expand Down Expand Up @@ -127,5 +127,20 @@ public function attachTasksToNode(AJXP_Node &$node, $isContextNode = false, $det
}
}
}

public function enrichConsumeChannel(ServerRequestInterface &$requestInterface, ResponseInterface &$responseInterface){

$respType = &$responseInterface->getBody();
if(!$respType instanceof \Pydio\Core\Http\Response\SerializableResponseStream && !$respType->getSize()){
$respType = new \Pydio\Core\Http\Response\SerializableResponseStream();
$responseInterface = $responseInterface->withBody($respType);
}
if($respType instanceof \Pydio\Core\Http\Response\SerializableResponseStream){
$taskList = TaskService::getInstance()->getCurrentRunningTasks(AuthService::getLoggedUser(), ConfService::getRepository());
$respType->addChunk(new TaskListMessage($taskList));
}

}


}

0 comments on commit 928bb4e

Please sign in to comment.