Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Commit

Permalink
Some tests with workers / nsq
Browse files Browse the repository at this point in the history
  • Loading branch information
cdujeu committed May 5, 2016
1 parent 13a60f0 commit 84cb6c2
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 0 deletions.
20 changes: 20 additions & 0 deletions core/src/core/classes/class.AJXP_Controller.php
Expand Up @@ -299,6 +299,26 @@ public static function applyActionInBackground($currentRepositoryId, $actionName
$logDir = AJXP_CACHE_DIR."/cmd_outputs";
if(!is_dir($logDir)) mkdir($logDir, 0755);
$logFile = $logDir."/".$token.".out";
if (empty($user)) {
if(AuthService::usersEnabled() && AuthService::getLoggedUser() !== null) $user = AuthService::getLoggedUser()->getId();
else $user = "shared";
}
/*
require_once(AJXP_INSTALL_PATH."/".AJXP_PLUGINS_FOLDER."/core.mq/vendor/autoload.php");
$nsq = new nsqphp\nsqphp;
$nsq->publishTo("localhost", 1);
$payload = array(
'msg' => 'bg',
'data' => [
'repository_id' => $currentRepositoryId,
'user_id' => $user,
'action' => $actionName,
'parameters' => $parameters
]);
$nsq->publish('pydio', new nsqphp\Message\Message(json_encode($payload)));
return;
*/
if (AuthService::usersEnabled()) {
$cKey = ConfService::getCoreConf("AJXP_CLI_SECRET_KEY", "conf");
if(empty($cKey)){
Expand Down
30 changes: 30 additions & 0 deletions core/src/index.php
Expand Up @@ -154,6 +154,36 @@
}
AJXP_PluginsService::getInstance()->initActivePlugins();
require_once(AJXP_BIN_FOLDER."/class.AJXP_Controller.php");

/*
require_once (AJXP_BIN_FOLDER."/silex/vendor/autoload.php");
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
$silex = new Silex\Application();
$silex->match("/", function(Silex\Application $silex, Request $request){
$stream = function () use ($request) {
$xmlResult = AJXP_Controller::findActionAndApply($request->get('get_action'), array_merge($request->query->all(), $request->request->all()), $_FILES);
if ($xmlResult !== false && $xmlResult != "") {
AJXP_XMLWriter::header();
print($xmlResult);
AJXP_XMLWriter::close();
} else if (isset($requireAuth) && AJXP_Controller::$lastActionNeedsAuth) {
AJXP_XMLWriter::header();
AJXP_XMLWriter::requireAuth();
AJXP_XMLWriter::close();
}
};
return new \Symfony\Component\HttpFoundation\StreamedResponse($stream, 200);
})->method("POST|GET");
$silex->run();
*/

$xmlResult = AJXP_Controller::findActionAndApply($action, array_merge($_GET, $_POST), $_FILES);
if ($xmlResult !== false && $xmlResult != "") {
AJXP_XMLWriter::header();
Expand Down
8 changes: 8 additions & 0 deletions core/src/plugins/core.mq/class.MqManager.php
Expand Up @@ -163,6 +163,14 @@ public function sendInstantMessage($xmlContent, $repositoryId, $targetUserId = n
$this->msgExchanger->publishInstantMessage("nodes:$repositoryId", $message);
}

/*
// Publish on NSQ
require_once(AJXP_INSTALL_PATH."core/classes/vendor/autoload.php");
$nsq = new nsqphp\nsqphp;
$nsq->publishTo("localhost", 1);
$nsq->publish('pydio', new nsqphp\Message\Message(json_encode(array('msg' => 'im', 'run' => $message))));
*/

// Publish for WebSockets
$configs = $this->getConfigs();
if ($configs["WS_SERVER_ACTIVE"]) {
Expand Down
153 changes: 153 additions & 0 deletions core/src/worker.php
@@ -0,0 +1,153 @@
<?php
/*
* Copyright 2007-2015 Abstrium <contact (at) pydio.com>
* This file is part of Pydio.
*
* Pydio is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Pydio is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Pydio. If not, see <http://www.gnu.org/licenses/>.
*
* The latest code can be found at <http://pyd.io/>.
*/

include_once("base.conf.php");

//set_error_handler(array("AJXP_XMLWriter", "catchError"), E_ALL & ~E_NOTICE & ~E_STRICT );
//set_exception_handler(array("AJXP_XMLWriter", "catchException"));

$pServ = AJXP_PluginsService::getInstance();
ConfService::$useSession = false;
AuthService::$useSession = false;

ConfService::init();
ConfService::start();

$confStorageDriver = ConfService::getConfStorageImpl();
require_once($confStorageDriver->getUserClassFileName());
$authDriver = ConfService::getAuthDriverImpl();
ConfService::currentContextIsRestAPI("api");
AJXP_PluginsService::getInstance()->initActivePlugins();

function applyTask($userId, $repoId, $actionName, $parameters){

print($userId." - ".$repoId." - ".$actionName." - ");
print("Log User\n");
AuthService::logUser($userId, "", true);
print("Find Repo\n");
if($repoId == 'pydio'){
ConfService::switchRootDir();
$repo = ConfService::getRepository();
}else{
$repo = ConfService::findRepositoryByIdOrAlias($repoId);
if ($repo == null) {
throw new Exception("Cannot find repository with ID ".$repoId);
}
ConfService::switchRootDir($repo->getId());
}
// DRIVERS BELOW NEED IDENTIFICATION CHECK
print("Load Driver\n");
if (!AuthService::usersEnabled() || ConfService::getCoreConf("ALLOW_GUEST_BROWSING", "auth") || AuthService::getLoggedUser()!=null) {
ConfService::getConfStorageImpl();
ConfService::loadDriverForRepository($repo);
}
print("Init plugins\n");
AJXP_PluginsService::getInstance()->initActivePlugins();

//print "Current repos are" . implode(", ", array_keys(ConfService::getAccessibleRepositories()))."\n";

print("Apply Action\n");
$xmlResult = AJXP_Controller::findActionAndApply($actionName, $parameters, []);
if (!empty($xmlResult) && !headers_sent()) {
AJXP_XMLWriter::header();
print($xmlResult);
AJXP_XMLWriter::close();
}

print("Empty ShutdownScheduler!\n");
AJXP_ShutdownScheduler::getInstance()->callRegisteredShutdown();

print("Invalidate\n");
ConfService::getInstance()->invalidateLoadedRepositories();
print("Disconnect\n");
AuthService::disconnect();
AJXP_PluginsService::updateXmlRegistry(null, true);

}

function deQueue(){
$fName = AJXP_DATA_PATH."/plugins/mq.serial/worker-queue";
if(file_exists($fName)){
$data = file_get_contents($fName);
if(!empty($data)){
$decoded = json_decode($data, true);
if(!empty($decoded) && is_array($decoded) && count($decoded)){
$task = array_pop($decoded);
file_put_contents($fName, json_encode($decoded));
return $task;
}
}
}
return false;
}

$method = isset($argv[1]) ? $argv[1] : 'nsq';

if($method == "nsq"){

include("plugins/core.mq/vendor/autoload.php");
$logger = new nsqphp\Logger\Stderr;
$dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
$lookup = new nsqphp\Lookup\FixedHosts('localhost:4150');
$requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
$nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy, $logger);
$channel = isset($argv[1]) ? $argv[1] : 'foo';
$nsq->subscribe('pydio', $channel, function($msg) {
echo "READ\t" . $msg->getId() . "\t" . $msg->getPayload() . "\n";
$payload = json_decode($msg->getPayload(), true);
$data = $payload["data"];
applyTask($data["user_id"], $data["repository_id"], $data["action"], $data["parameters"]);
});
$nsq->run();


}else{

while(true){

$task = deQueue();
if($task !== false){
try{
print "--------------------------------------\n";
print "Applying task ".$task["actionName"]."\n";
print_r($task);
applyTask($task["userId"], $task["repoId"], $task["actionName"], $task["parameters"]);
}catch (Exception $e){
print "Error : ".$e->getMessage()."\n";
}
flush();
}else{
print "--------- nothing to do \n";
}
print("5\r");
sleep(1);
print("4\r");
sleep(1);
print("3\r");
sleep(1);
print("2\r");
sleep(1);
print("1\r");
sleep(1);

}
}

0 comments on commit 84cb6c2

Please sign in to comment.