Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Commit

Permalink
Fix consume_mail_queue when some resources do not exist. Handle pydio…
Browse files Browse the repository at this point in the history
…-task-id statuses.
  • Loading branch information
cdujeu committed Aug 24, 2016
1 parent 3386868 commit d0fe03c
Showing 1 changed file with 134 additions and 113 deletions.
247 changes: 134 additions & 113 deletions core/src/plugins/core.mailer/Mailer.php
Expand Up @@ -25,6 +25,8 @@
use dibi;
use DibiException;
use Exception;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Pydio\Access\Core\Model\AJXP_Node;
use Pydio\Core\Model\ContextInterface;
use Pydio\Conf\Core\AbstractUser;
Expand All @@ -41,6 +43,9 @@
use Pydio\Core\PluginFramework\PluginsService;
use Pydio\Core\PluginFramework\SqlTableProvider;
use Pydio\Notification\Core\Notification;
use Pydio\Tasks\Task;
use Pydio\Tasks\TaskService;
use Zend\Diactoros\Response\JsonResponse;

defined('AJXP_EXEC') or die('Access not allowed');

Expand Down Expand Up @@ -136,155 +141,171 @@ public function getConsumerLock() {


/**
* @param $action
* @param $httpVars
* @param $fileVars
* @param ContextInterface $ctx
* @param ServerRequestInterface $requestInterface
* @param ResponseInterface $responseInterface
* @throws PydioException
*/
public function mailConsumeQueue($action, $httpVars, $fileVars, ContextInterface $ctx)
public function mailConsumeQueue(ServerRequestInterface $requestInterface, ResponseInterface &$responseInterface)
{

if ($action === "consume_mail_queue") {

$verbose = $httpVars["verbose"];

$logInfo = function () {};
if ($verbose) {
$logInfo = function ($str) {
fwrite(STDOUT, $str . "\n");
};
}
/** @var ContextInterface $ctx */
$ctx = $requestInterface->getAttribute("ctx");
$httpVars = $requestInterface->getParsedBody();
$verbose = $httpVars["log-output"];
$taskUid = $requestInterface->getAttribute("pydio-task-id");

$logInfo = function () {};
if ($verbose) {
$logInfo = function ($str) {
fwrite(STDOUT, $str . "\n");
};
}else if(!empty($taskUid)){
$logInfo = function ($str) use ($taskUid) {
TaskService::getInstance()->updateTaskStatus($taskUid, Task::STATUS_RUNNING, $str);
};
}


$mailer = PluginsService::getInstance($ctx)->getActivePluginsForType("mailer", true);
if (!$mailer instanceof Mailer) {
throw new PydioException("Cannot find active mailer!");
}
if (!dibi::isConnected()) {
dibi::connect($this->getDibiDriver());
}
if ($this->_dibiDriver["driver"] == "postgre") {
dibi::query("SET bytea_output=escape");
}
$mailer = PluginsService::getInstance($ctx)->getActivePluginsForType("mailer", true);
if (!$mailer instanceof Mailer) {
throw new PydioException("Cannot find active mailer!");
}
if (!dibi::isConnected()) {
dibi::connect($this->getDibiDriver());
}
if ($this->_dibiDriver["driver"] == "postgre") {
dibi::query("SET bytea_output=escape");
}

// Get the queue consumer lock and the time it was given
$time = $this->getConsumerLock();
// Get the queue consumer lock and the time it was given
$time = $this->getConsumerLock();

try {
$querySQL = dibi::query("SELECT * FROM [ajxp_mail_queue] WHERE [date_event] <= %s", $time);
} catch (DibiException $e) {
throw new PydioException($e->getMessage());
}
//$querySQL->fetch();
//$resultsSQL = $querySQL->fetchAll();
$numRows = $querySQL->count();
try {
$querySQL = dibi::query("SELECT * FROM [ajxp_mail_queue] WHERE [date_event] <= %s", $time);
} catch (DibiException $e) {
throw new PydioException($e->getMessage());
}
//$querySQL->fetch();
//$resultsSQL = $querySQL->fetchAll();
$numRows = $querySQL->count();

$results = [];
$results = [];

HTMLWriter::charsetHeader("text/json");
HTMLWriter::charsetHeader("text/json");

if ($numRows == 0) {
$logInfo("Nothing to process");
$output = array("report" => "Sent 0 emails", "detail" => "");
echo json_encode($output);
return;
if ($numRows == 0) {
$logInfo("Nothing to process");
$output = array("report" => "Sent 0 emails", "detail" => "");
$responseInterface = new JsonResponse($output);
if(!empty($taskUid)){
TaskService::getInstance()->updateTaskStatus($taskUid, Task::STATUS_COMPLETE, "Sent 0 emails");
}
return;
}

$logInfo("Processing " . $numRows . " rows.");
$logInfo("Processing " . $numRows . " rows.");

// We need to send one email :
// - per user
// - per email type (HTML or PLAIN)
while($value = $querySQL->fetch()) {
$i = 0;
// We need to send one email :
// - per user
// - per email type (HTML or PLAIN)
while($value = $querySQL->fetch()) {

// Retrieving user information
$recipient = $value['recipient'];
// Retrieving user information
$recipient = $value['recipient'];
$logInfo("Processing notification ".($i+1)."/$numRows");

// Retrieving Email type information
$emailType = ($value["html"] == 1) ? "html" : "plain";
// Retrieving Email type information
$emailType = ($value["html"] == 1) ? "html" : "plain";

// Retrieving notification information
/** @var Notification $notification */
$notification = unserialize($value["notification_object"]);
// Retrieving notification information
/** @var Notification $notification */
$notification = unserialize($value["notification_object"]);

$action = $notification->getAction();
$author = $notification->getAuthor();
$node = $notification->getNode();
$action = $notification->getAction();
$author = $notification->getAuthor();
$node = $notification->getNode();
if(!$node instanceof AJXP_Node){
continue;
}
try {
@$node->loadNodeInfo();
} catch(Exception $e){
continue;
}

try {
@$node->loadNodeInfo();
} catch(Exception $e){
if ($node->isLeaf() && !$node->isRoot()) {
$dirName = $node->getParent()->getPath();
} else {
$dirName = $node->getPath();
if ($dirName === null) {
$dirName = '/';
}
}
$key = sprintf("%s|%s|%s", $action, $author, $dirName);

if ($node->isLeaf() && !$node->isRoot()) {
$dirName = $node->getParent()->getPath();
} else {
$dirName = $node->getPath();
if ($dirName === null) {
$dirName = '/';
}
}
$key = sprintf("%s|%s|%s", $action, $author, $dirName);
// Retrieving workspace information
if($node->getRepository() != null) {
$workspace = $node->getRepository()->getDisplay();
} else {
$workspace = "Deleted Workspace";
}

// Retrieving workspace information
if($node->getRepository() != null) {
$workspace = $node->getRepository()->getDisplay();
} else {
$workspace = "Deleted Workspace";
}
$results[$emailType][$recipient][$workspace][$key][] = $notification;

$results[$emailType][$recipient][$workspace][$key][] = $notification;
}
$logInfo("Processed recipient ".($i+1)."/$numRows (" . $recipient.")");

$logInfo("Created digest array.");
$i ++;

$subject = LocaleService::getMessages()["core.mailer.9"];
}

$success = 0;
$errors = [];
foreach ($results as $emailType => $recipients) {
$logInfo("Created digest array.");

$isHTML = $emailType == "html";
$subject = LocaleService::getMessages()["core.mailer.9"];

$i = 0;
foreach ($recipients as $recipient => $workspaces) {
$success = 0;
$errors = [];
foreach ($results as $emailType => $recipients) {

$logInfo("Processed " . ++$i . " out of " . count($recipients) . " " . $emailType . " emails " . $recipient);
$isHTML = $emailType == "html";

$body = $this->_buildDigest($workspaces, $emailType);
$i = 0;
foreach ($recipients as $recipient => $workspaces) {
$logInfo("Processing " . ++$i . " out of " . count($recipients) . " " . $emailType . " emails " . $recipient);
$body = $this->_buildDigest($workspaces, $emailType);
$success++;
try {
$mailer->sendMail(
$ctx,
[$recipient],
$subject,
$body,
null,
null,
$isHTML
);

$success++;
try {
$mailer->sendMail(
$ctx,
[$recipient],
$subject,
$body,
null,
null,
$isHTML
);

$success++;
} catch (\Exception $e) {
$errors[] = "Failed to send email to " . $recipient . ": " . $e->getMessage();
}
} catch (\Exception $e) {
$errors[] = "Failed to send email to " . $recipient . ": " . $e->getMessage();
}
}
}

// Clearing memory
unset($results);
// Clearing memory
unset($results);

try {
dibi::query('DELETE FROM [ajxp_mail_queue] WHERE [date_event] <= %s', $time);
} catch (DibiException $e) {
throw new PydioException($e->getMessage());
}
try {
dibi::query('DELETE FROM [ajxp_mail_queue] WHERE [date_event] <= %s', $time);
} catch (DibiException $e) {
throw new PydioException($e->getMessage());
}

$output = array("report" => "Sent ".$success." emails", "errors" => $errors);
echo json_encode($output);
$output = array("report" => "Sent ".$success." emails", "errors" => $errors);
$responseInterface = new JsonResponse($output);
if(!empty($taskUid)){
TaskService::getInstance()->updateTaskStatus($taskUid, Task::STATUS_COMPLETE, "Sent $success emails");
}

}

/**
Expand All @@ -308,7 +329,7 @@ private function _buildDigest($workspaces, $emailType) {

/** @var Notification $notification */
foreach ($notifications as $notification) {
if (empty($current)) {
if (empty($current) && $notification->getNode()->getRepository() !== null) {
$title = sprintf($template["title"], $notification->getDescriptionLocation());
}

Expand Down

0 comments on commit d0fe03c

Please sign in to comment.