Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Commit

Permalink
Implement proper interruption for indexation task, remove "stoppable"…
Browse files Browse the repository at this point in the history
… flag for other tasks as long as they don't implement it as well.

Re-plug ShutdownScheduler on shutdown hook for alternative accesses (DAVServer, Wopi).
Open task panel more largely on mouseOver to see the full texts.
  • Loading branch information
cdujeu committed Oct 6, 2016
1 parent 35b87fd commit 1233d69
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 38 deletions.
10 changes: 9 additions & 1 deletion core/src/core/src/pydio/Core/Controller/ShutdownScheduler.php
Expand Up @@ -56,7 +56,7 @@ public static function getInstance()
public function __construct()
{
$this->callbacks = array();
// register_shutdown_function(array($this, 'callRegisteredShutdown'));
register_shutdown_function(array($this, 'callRegisteredShutdown'));
// ob_start();
}

Expand Down Expand Up @@ -101,6 +101,13 @@ public function registerShutdownEvent()
return true;
}

public function closeAndCallRegisteredShutdown(){
if(!headers_sent()){
header("Connection: close\r\n");
}
$this->callRegisteredShutdown();
}

/**
* Trigger the schedulers
* @param OutputInterface $cliOutput
Expand All @@ -115,6 +122,7 @@ public function callRegisteredShutdown($cliOutput = null)
$arguments = array_shift($this->callbacks);
$callback = array_shift($arguments);
try {
error_log("<comment>--> Applying Shutdown Hook: ". get_class($callback[0]) ."::".$callback[1]."</comment>");
if($cliOutput !== null){
$cliOutput->writeln("<comment>--> Applying Shutdown Hook: ". get_class($callback[0]) ."::".$callback[1]."</comment>");
}
Expand Down
38 changes: 38 additions & 0 deletions core/src/core/src/pydio/Core/Exception/UserInterruptException.php
@@ -0,0 +1,38 @@
<?php
/*
* Copyright 2007-2016 Abstrium <contact (at) pydio.com>
* This file is part of Pydio.
*
* Pydio is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Pydio is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Pydio. If not, see <http://www.gnu.org/licenses/>.
*
* The latest code can be found at <https://pydio.com/>.
*/
namespace Pydio\Core\Exception;

defined('AJXP_EXEC') or die('Access not allowed');

/**
* Class UserInterruptException
* @package Pydio\Core\Exception
*/
class UserInterruptException extends PydioException
{
/**
* UserInterruptException constructor.
*/
public function __construct()
{
parent::__construct("User interruption");
}
}
4 changes: 2 additions & 2 deletions core/src/core/src/pydio/Core/Http/Dav/DAVServer.php
Expand Up @@ -55,7 +55,7 @@ public static function handleRoute($baseURI, $davRoute){
self::$context = Context::emptyContext();

if (!ConfService::getGlobalConf("WEBDAV_ENABLE")) {
throw new Forbidden('You are not allowed to access this service');
die('You are not allowed to access this service');
}

PluginsService::getInstance(self::$context)->initActivePlugins();
Expand All @@ -82,7 +82,7 @@ public static function handleRoute($baseURI, $davRoute){
}
}
if ($repository == null) {
throw new Forbidden('You are not allowed to access this service');
die('You are not allowed to access this service '.$repositoryId);
}

self::$context->setRepositoryId($repositoryId);
Expand Down
2 changes: 2 additions & 0 deletions core/src/core/src/pydio/Core/Http/Wopi/Middleware.php
Expand Up @@ -126,7 +126,9 @@ public function emitResponse(ServerRequestInterface $request, ResponseInterface

if($response !== false && ($response->getBody()->getSize() || $response instanceof EmptyResponse) || $response->getStatusCode() != 200) {
$emitter = new SapiEmitter();
$response = $response->withHeader("Connection", "close");
$emitter->emit($response);
ShutdownScheduler::getInstance()->callRegisteredShutdown();
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/plugins/access.fs/FsAccessDriver.php
Expand Up @@ -786,7 +786,7 @@ public function switchAction(ServerRequestInterface &$request, ResponseInterface
if($request->getAttribute("pydio-task-id") === null){
$task = TaskService::actionAsTask($ctx, $action, $httpVars);
$task->setActionLabel($mess, '313');
$task->setFlags(Task::FLAG_STOPPABLE);
//$task->setFlags(Task::FLAG_STOPPABLE);
$response = TaskService::getInstance()->enqueueTask($task, $request, $response);
break;
}
Expand Down Expand Up @@ -995,7 +995,7 @@ public function switchAction(ServerRequestInterface &$request, ResponseInterface
if($taskId === null && ($size === -1 || $size > $bgSizeThreshold)){
$task = TaskService::actionAsTask($ctx, $action, $httpVars);
$task->setActionLabel($mess, $action === 'copy' ? '66' : '70');
$task->setFlags(Task::FLAG_STOPPABLE);
//$task->setFlags(Task::FLAG_STOPPABLE);
if($size === -1 || $size > $bgWorkerThreshold){
$task->setSchedule(new Schedule(Schedule::TYPE_ONCE_DEFER));
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/plugins/action.compression/PluginCompression.php
Expand Up @@ -98,7 +98,7 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
$taskId = $requestInterface->getAttribute("pydio-task-id");
// LAUNCH IN BACKGROUND AND EXIT
if (empty($taskId)) {
$task = TaskService::actionAsTask($ctx, "compression", $httpVars, [], Task::FLAG_STOPPABLE | Task::FLAG_HAS_PROGRESS);
$task = TaskService::actionAsTask($ctx, "compression", $httpVars, [], Task::FLAG_HAS_PROGRESS);
$task->setLabel($messages["compression.5"]);
$responseInterface = TaskService::getInstance()->enqueueTask($task, $requestInterface, $responseInterface);
break;
Expand Down Expand Up @@ -226,7 +226,7 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
$taskId = $requestInterface->getAttribute("pydio-task-id");
// LAUNCH IN BACKGROUND AND EXIT
if (empty($taskId)) {
$task = TaskService::actionAsTask($ctx, "extraction", $httpVars, [], Task::FLAG_STOPPABLE | Task::FLAG_HAS_PROGRESS);
$task = TaskService::actionAsTask($ctx, "extraction", $httpVars, [], Task::FLAG_HAS_PROGRESS);
$task->setLabel($messages["compression.12"]);
$responseInterface = TaskService::getInstance()->enqueueTask($task, $requestInterface, $responseInterface);
break;
Expand Down
36 changes: 25 additions & 11 deletions core/src/plugins/core.index/CoreIndexer.php
Expand Up @@ -23,6 +23,8 @@

use Pydio\Access\Core\Model\AJXP_Node;
use Pydio\Access\Core\Model\UserSelection;
use Pydio\Core\Exception\PydioException;
use Pydio\Core\Exception\UserInterruptException;
use Pydio\Core\Model\ContextInterface;
use Pydio\Core\Model\UserInterface;
use Pydio\Core\Controller\Controller;
Expand Down Expand Up @@ -109,10 +111,16 @@ public function applyAction(\Psr\Http\Message\ServerRequestInterface $requestInt
$this->debug("Error Indexing Node ".$node->getUrl()." (".$e->getMessage().")");
}
}else{
try{
try {
$this->recursiveIndexation($ctx, $node);
}catch(UserInterruptException $uIE){
$this->debug("Interrupting indexation! - node.index.recursive.end - ". $node->getUrl());
$this->releaseStatus($node->getRepository(), $ctx->getUser(), "Interrupted by user", true);
Controller::applyHook("node.index.recursive.end", array($node, false));
}catch (\Exception $e){
$this->debug("Indexation of ".$node->getUrl()." interrupted by error: (".$e->getMessage().")");
$this->releaseStatus($node->getRepository(), $ctx->getUser(), "Error", true);
Controller::applyHook("node.index.recursive.end", array($node, false));
}
}

Expand All @@ -138,10 +146,7 @@ public function recursiveIndexation(ContextInterface $ctx, $node, $depth = 0)
Controller::applyHook("node.index.recursive.start", array($node));
}else{
if($this->isInterruptRequired($repository, $user)){
$this->debug("Interrupting indexation! - node.index.recursive.end - ". $node->getUrl());
Controller::applyHook("node.index.recursive.end", array($node));
$this->releaseStatus($repository, $user);
throw new \Exception("User interrupted");
throw new UserInterruptException("User interrupted");
}
}

Expand All @@ -163,6 +168,11 @@ public function recursiveIndexation(ContextInterface $ctx, $node, $depth = 0)
if($child[0] == ".") continue;
$childNode = new AJXP_Node(rtrim($url, "/")."/".$child);
$childUrl = $childNode->getUrl();

if($this->isInterruptRequired($repository, $user)){
throw new UserInterruptException("User interrupted");
}

if(is_dir($childUrl)){
$this->debug("Entering recursive indexation for ".$childUrl);
$this->recursiveIndexation($ctx, $childNode, $depth + 1);
Expand All @@ -182,8 +192,8 @@ public function recursiveIndexation(ContextInterface $ctx, $node, $depth = 0)
if($depth == 0){
$this->debug("End indexation - node.index.recursive.end - ". memory_get_usage(true) ." - ". $node->getUrl());
$this->setIndexStatus("RUNNING", "Indexation finished, cleaning...", $repository, $user, false);
Controller::applyHook("node.index.recursive.end", array($node));
$this->releaseStatus($repository, $user);
Controller::applyHook("node.index.recursive.end", array($node, true));
$this->releaseStatus($repository, $user, "Indexation finished");
$this->debug("End indexation - After node.index.recursive.end - ". memory_get_usage(true) ." - ". $node->getUrl());
}
}
Expand Down Expand Up @@ -216,7 +226,9 @@ protected function buildIndexLockKey($repository, $user){
protected function setIndexStatus($status, $message, $repository, $user, $stoppable = true)
{
if(isSet($this->currentTaskId)){
TaskService::getInstance()->updateTaskStatus($this->currentTaskId, Task::STATUS_RUNNING, $message, $stoppable);
if(!$this->isInterruptRequired($repository, $user)){
TaskService::getInstance()->updateTaskStatus($this->currentTaskId, Task::STATUS_RUNNING, $message, $stoppable);
}
}
$iPath = (defined('AJXP_SHARED_CACHE_DIR')?AJXP_SHARED_CACHE_DIR:AJXP_CACHE_DIR)."/indexes";
if(!is_dir($iPath)) mkdir($iPath,0755, true);
Expand All @@ -243,11 +255,13 @@ protected function getIndexStatus($repository, $user)
/**
* @param \Pydio\Core\Model\RepositoryInterface $repository
* @param UserInterface $user
* @param string $label
* @param bool $errorStatus
*/
protected function releaseStatus($repository, $user)
protected function releaseStatus($repository, $user, $label, $errorStatus = false)
{
if(isSet($this->currentTaskId)){
TaskService::getInstance()->updateTaskStatus($this->currentTaskId, Task::STATUS_COMPLETE, "Done");
TaskService::getInstance()->updateTaskStatus($this->currentTaskId, $errorStatus ? Task::STATUS_FAILED : Task::STATUS_COMPLETE, $label);
}
$f = (defined('AJXP_SHARED_CACHE_DIR')?AJXP_SHARED_CACHE_DIR:AJXP_CACHE_DIR)."/indexes/.indexation_status-".$this->buildIndexLockKey($repository, $user);
$this->debug("Removing file ".$f);
Expand All @@ -273,7 +287,7 @@ protected function isInterruptRequired($repository, $user)
{
if(isSet($this->currentTaskId)){
$task = TaskService::getInstance()->getTaskById($this->currentTaskId);
return ($task->getStatus() == Task::STATUS_PAUSED);
return ($task->getStatus() === Task::STATUS_INTERRUPT);
}
list($status, $message) = $this->getIndexStatus($repository, $user);
return ($status == "INTERRUPT");
Expand Down
30 changes: 22 additions & 8 deletions core/src/plugins/core.tasks/js/react/PydioTasks.js
Expand Up @@ -55,7 +55,7 @@
}

pause(){
TaskAPI.updateTaskStatus(this, Task.STATUS_PAUSED);
TaskAPI.updateTaskStatus(this, Task.STATUS_INTERRUPT);
}

stop(){
Expand All @@ -74,6 +74,7 @@
Task.STATUS_COMPLETE = 4;
Task.STATUS_FAILED = 8;
Task.STATUS_PAUSED = 16;
Task.STATUS_INTERRUPT = 64;

Task.FLAG_STOPPABLE = 1;
Task.FLAG_RESUMABLE = 2;
Expand Down Expand Up @@ -286,7 +287,8 @@

propTypes: {
task: React.PropTypes.instanceOf(Task),
adminDisplayScope: React.PropTypes.bool
adminDisplayScope: React.PropTypes.bool,
showFull: React.PropTypes.bool
},

render: function(){
Expand All @@ -301,6 +303,9 @@
clickStyle = {cursor:'pointer'};
}
let customClassName = this.props.task.getClassName() || '';
if(this.props.showFull){
customClassName += ' show-full';
}
return (
<div className={"task " + "task-status-" + this.props.task.getStatus() + " " + customClassName}>
<div className="task_texts" onClick={click} style={clickStyle}>
Expand All @@ -326,7 +331,8 @@

getInitialState(){
return {
tasks: TaskStore.getInstance().getTasks()
tasks: TaskStore.getInstance().getTasks(),
mouseOver: false
};
},

Expand All @@ -338,21 +344,29 @@
TaskStore.getInstance().stopObserving("tasks_updated");
},

onMouseOver: function(){
this.setState({mouseOver: true});
},

onMouseOut: function(){
this.setState({mouseOver: false});
},

render: function(){
let tasks = [];
this.state.tasks.forEach(function(t){
if(t.getStatus() == Task.STATUS_COMPLETE) return;
tasks.push(<TaskEntry task={t}/>);
});
if(t.getStatus() === Task.STATUS_COMPLETE) return;
tasks.push(<TaskEntry task={t} showFull={this.state.mouseOver}/>);
}.bind(this));
let className = "pydio-tasks-panel";
let heightStyle;
if(!tasks.length){
className += " invisible";
}else{
heightStyle = {height: Math.min(tasks.length * 60, 180)};
heightStyle = {height: this.state.mouseOver ? 'auto' : Math.min(tasks.length * 60, 180)};
}
return (
<div className={className} style={heightStyle}>
<div onMouseOver={this.onMouseOver} onMouseOut={this.onMouseOut} className={className} style={heightStyle}>
{tasks}
</div>
);
Expand Down
1 change: 1 addition & 0 deletions core/src/plugins/core.tasks/src/Task.php
Expand Up @@ -41,6 +41,7 @@ class Task
const STATUS_FAILED = 8;
const STATUS_PAUSED = 16;
const STATUS_TEMPLATE = 32;
const STATUS_INTERRUPT = 64;

const FLAG_STOPPABLE = 1;
const FLAG_RESUMABLE = 2;
Expand Down
Expand Up @@ -30,6 +30,14 @@ div.pydio-tasks-panel{
overflow: hidden;
white-space: nowrap;
}
div.task_label{
font-weight: 500;
}
}
&.show-full .task_texts div{
text-overflow: inherit;
overflow: visible;
white-space: normal;
}
.task_actions{
cursor: pointer;
Expand Down
8 changes: 8 additions & 0 deletions core/src/plugins/gui.ajax/res/themes/orbit/css/pydio.css
Expand Up @@ -5905,6 +5905,14 @@ div.pydio-tasks-panel div.task .task_texts div {
overflow: hidden;
white-space: nowrap;
}
div.pydio-tasks-panel div.task .task_texts div.task_label {
font-weight: 500;
}
div.pydio-tasks-panel div.task.show-full .task_texts div {
text-overflow: inherit;
overflow: visible;
white-space: normal;
}
div.pydio-tasks-panel div.task .task_actions {
cursor: pointer;
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/plugins/index.elasticsearch/ElasticSearchIndexer.php
Expand Up @@ -124,9 +124,10 @@ public function indexationStarts($parentNode){

/**
* @param \Pydio\Access\Core\Model\AJXP_Node $parentNode
* @param bool $success
*/
public function indexationEnds($parentNode){
if($this->currentIndex) {
public function indexationEnds($parentNode, $success){
if($success && $this->currentIndex) {
$this->currentIndex->optimize();
}
}
Expand Down

0 comments on commit 1233d69

Please sign in to comment.