Skip to content

Commit

Permalink
Merge pull request #235 from citusdata/marcocitus/jobs-in-seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
marcocitus committed Feb 7, 2023
2 parents 4b5f230 + c15004f commit 71642fd
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 15 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## What is pg_cron?

pg_cron is a simple cron-based job scheduler for PostgreSQL (10 or higher) that runs inside the database as an extension. It uses the same syntax as regular cron, but it allows you to schedule PostgreSQL commands directly from the database:
pg_cron is a simple cron-based job scheduler for PostgreSQL (10 or higher) that runs inside the database as an extension. It uses the same syntax as regular cron, but it allows you to schedule PostgreSQL commands directly from the database. You can also use '[1-59] seconds' to schedule a job based on an interval.

```sql
-- Delete old data on Saturday at 3:30am (GMT)
Expand Down Expand Up @@ -41,6 +41,9 @@ SELECT cron.schedule_in_database('weekly-vacuum', '0 4 * * 0', 'VACUUM', 'some_o
schedule
----------
44

-- Call a stored procedure every 5 seconds
SELECT cron.schedule('process-updates', '5 seconds', 'CALL process_updates()');
```

pg_cron can run multiple jobs in parallel, but it runs at most one instance of a job at a time. If a second run is supposed to start before the first one finishes, then the second run is queued and started as soon as the first run completes.
Expand Down
62 changes: 61 additions & 1 deletion expected/pg_cron-test.out
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,35 @@ SELECT cron.unschedule(1);
-- Invalid input: input too long
SELECT cron.schedule(repeat('a', 1000), '');
ERROR: invalid schedule: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
-- Invalid input: missing parts
SELECT cron.schedule('* * * *', 'SELECT 1');
ERROR: invalid schedule: * * * *
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
-- Invalid input: trailing characters
SELECT cron.schedule('5 secondc', 'SELECT 1');
ERROR: invalid schedule: 5 secondc
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
SELECT cron.schedule('50 seconds c', 'SELECT 1');
ERROR: invalid schedule: 50 seconds c
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
-- Invalid input: seconds out of range
SELECT cron.schedule('-1 seconds', 'SELECT 1');
ERROR: invalid schedule: -1 seconds
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
SELECT cron.schedule('0 seconds', 'SELECT 1');
ERROR: invalid schedule: 0 seconds
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
SELECT cron.schedule('60 seconds', 'SELECT 1');
ERROR: invalid schedule: 60 seconds
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
SELECT cron.schedule('10000000000 seconds', 'SELECT 1');
ERROR: invalid schedule: 10000000000 seconds
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
-- Try to update pg_cron on restart
SELECT cron.schedule('@restar', 'ALTER EXTENSION pg_cron UPDATE');
ERROR: invalid schedule: @restar
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
SELECT cron.schedule('@restart', 'ALTER EXTENSION pg_cron UPDATE');
schedule
----------
Expand Down Expand Up @@ -200,12 +226,46 @@ CREATE OR REPLACE FUNCTION public.func1(text, current_setting) RETURNS text
CREATE OR REPLACE FUNCTION public.func1(current_setting) RETURNS text
LANGUAGE sql volatile AS 'INSERT INTO test(data) VALUES (current_user); SELECT current_database()::text;';
CREATE CAST (current_setting AS text) WITH FUNCTION public.func1(current_setting) AS IMPLICIT;
CREATE EXTENSION pg_cron VERSION '1.4';
CREATE EXTENSION pg_cron;
select * from public.test;
data
------
(0 rows)

-- valid interval jobs
SELECT cron.schedule('1 second', 'SELECT 1');
schedule
----------
1
(1 row)

SELECT cron.schedule(' 30 sEcOnDs ', 'SELECT 1');
schedule
----------
2
(1 row)

SELECT cron.schedule('59 seconds', 'SELECT 1');
schedule
----------
3
(1 row)

SELECT cron.schedule('17 seconds ', 'SELECT 1');
schedule
----------
4
(1 row)

SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;
jobid | jobname | schedule | command
-------+---------+--------------+----------
1 | | 1 second | SELECT 1
2 | | 30 sEcOnDs | SELECT 1
3 | | 59 seconds | SELECT 1
4 | | 17 seconds | SELECT 1
(4 rows)

-- cleaning
DROP EXTENSION pg_cron;
drop user pgcron_cront;
Expand Down
2 changes: 1 addition & 1 deletion include/cron.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ typedef struct _entry {
uid_t uid;
gid_t gid;
char **envp;
char *cmd;
int secondsInterval;
bitstr_t bit_decl(minute, MINUTE_COUNT);
bitstr_t bit_decl(hour, HOUR_COUNT);
bitstr_t bit_decl(dom, DOM_COUNT);
Expand Down
2 changes: 2 additions & 0 deletions include/task_states.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ typedef struct CronTask
PGconn *connection;
PostgresPollingStatusType pollingStatus;
TimestampTz startDeadline;
TimestampTz lastStartTime;
uint32 secondsInterval;
bool isSocketReady;
bool isActive;
char *errorMessage;
Expand Down
22 changes: 21 additions & 1 deletion sql/pg_cron-test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ SELECT cron.unschedule(1);
-- Invalid input: input too long
SELECT cron.schedule(repeat('a', 1000), '');

-- Invalid input: missing parts
SELECT cron.schedule('* * * *', 'SELECT 1');

-- Invalid input: trailing characters
SELECT cron.schedule('5 secondc', 'SELECT 1');
SELECT cron.schedule('50 seconds c', 'SELECT 1');

-- Invalid input: seconds out of range
SELECT cron.schedule('-1 seconds', 'SELECT 1');
SELECT cron.schedule('0 seconds', 'SELECT 1');
SELECT cron.schedule('60 seconds', 'SELECT 1');
SELECT cron.schedule('10000000000 seconds', 'SELECT 1');

-- Try to update pg_cron on restart
SELECT cron.schedule('@restar', 'ALTER EXTENSION pg_cron UPDATE');
SELECT cron.schedule('@restart', 'ALTER EXTENSION pg_cron UPDATE');
Expand Down Expand Up @@ -117,9 +130,16 @@ CREATE OR REPLACE FUNCTION public.func1(current_setting) RETURNS text

CREATE CAST (current_setting AS text) WITH FUNCTION public.func1(current_setting) AS IMPLICIT;

CREATE EXTENSION pg_cron VERSION '1.4';
CREATE EXTENSION pg_cron;
select * from public.test;

-- valid interval jobs
SELECT cron.schedule('1 second', 'SELECT 1');
SELECT cron.schedule(' 30 sEcOnDs ', 'SELECT 1');
SELECT cron.schedule('59 seconds', 'SELECT 1');
SELECT cron.schedule('17 seconds ', 'SELECT 1');
SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;

-- cleaning
DROP EXTENSION pg_cron;
drop user pgcron_cront;
Expand Down
2 changes: 0 additions & 2 deletions src/entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ static int set_element(bitstr_t *, int, int, int);
void
free_entry(entry *e)
{
if (e->cmd)
free(e->cmd);
free(e);
}

Expand Down
71 changes: 66 additions & 5 deletions src/job_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/formatting.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
Expand Down Expand Up @@ -88,6 +89,9 @@ static void AlterJob(int64 jobId, text *scheduleText, text *commandText,
text *databaseText, text *usernameText, bool *active);

static Oid GetRoleOidIfCanLogin(char *username);
static entry * ParseSchedule(char *scheduleText);
static bool TryParseInterval(char *scheduleText, uint32 *secondsInterval);


/* SQL-callable functions */
PG_FUNCTION_INFO_V1(cron_schedule);
Expand Down Expand Up @@ -215,12 +219,14 @@ ScheduleCronJob(text *scheduleText, text *commandText, text *databaseText,

/* check schedule is valid */
schedule = text_to_cstring(scheduleText);
parsedSchedule = parse_cron_entry(schedule);
parsedSchedule = ParseSchedule(schedule);

if (parsedSchedule == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid schedule: %s", schedule)));
errmsg("invalid schedule: %s", schedule),
errhint("Use cron format (e.g. 5 4 * * *), or interval "
"format '[1-59] seconds'")));
}

free_entry(parsedSchedule);
Expand Down Expand Up @@ -975,7 +981,7 @@ TupleToCronJob(TupleDesc tupleDescriptor, HeapTuple heapTuple)
}
}

parsedSchedule = parse_cron_entry(job->scheduleText);
parsedSchedule = ParseSchedule(job->scheduleText);
if (parsedSchedule != NULL)
{
/* copy the schedule and free the allocated memory immediately */
Expand Down Expand Up @@ -1282,12 +1288,14 @@ AlterJob(int64 jobId, text *scheduleText, text *commandText, text *databaseText,
if (scheduleText != NULL)
{
schedule = text_to_cstring(scheduleText);
parsedSchedule = parse_cron_entry(schedule);
parsedSchedule = ParseSchedule(schedule);

if (parsedSchedule == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid schedule: %s", schedule)));
errmsg("invalid schedule: %s", schedule),
errhint("Use cron format (e.g. 5 4 * * *), or interval "
"format '[1-59] seconds'")));
}

free_entry(parsedSchedule);
Expand Down Expand Up @@ -1464,3 +1472,56 @@ JobTableExists(void)

return jobTableOid != InvalidOid;
}


/*
* ParseSchedule attempts to parse a cron schedule or an interval in seconds.
* The returned pointer is allocated using malloc and should be freed by the
* caller.
*/
static entry *
ParseSchedule(char *scheduleText)
{
uint32 secondsInterval = 0;

/*
* Parse as interval on seconds or fall back to trying cron schedule.
*/
if (TryParseInterval(scheduleText, &secondsInterval))
{
entry *schedule = calloc(sizeof(entry), sizeof(char));
schedule->secondsInterval = secondsInterval;
return schedule;
}

return parse_cron_entry(scheduleText);
}


/*
* TryParseInterval returns whether scheduleText is of the form
* <positive number> second[s].
*/
static bool
TryParseInterval(char *scheduleText, uint32 *secondsInterval)
{
char plural = '\0';
char extra = '\0';
char *lowercaseSchedule = asc_tolower(scheduleText, strlen(scheduleText));

int numParts = sscanf(lowercaseSchedule, " %u second%c %c", secondsInterval,
&plural, &extra);

if (numParts == 1)
{
/* <number> second (allow "2 second") */
return 0 < *secondsInterval && *secondsInterval < 60;
}
else if (numParts == 2 && plural == 's')
{
/* <number> seconds (allow "1 seconds") */
return 0 < *secondsInterval && *secondsInterval < 60;
}

return false;
}
50 changes: 46 additions & 4 deletions src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,8 @@ StartAllPendingRuns(List *taskList, TimestampTz currentTime)
CronJob *cronJob = GetCronJob(task->jobId);
entry *schedule = &cronJob->schedule;

if (schedule->flags & WHEN_REBOOT)
if (schedule->flags & WHEN_REBOOT &&
task->isActive)
{
task->pendingRunCount += 1;
}
Expand All @@ -682,6 +683,27 @@ StartAllPendingRuns(List *taskList, TimestampTz currentTime)
RebootJobsScheduled = true;
}

foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);

if (task->secondsInterval > 0 && task->isActive)
{
/*
* For interval jobs, if a task takes longer than the interval,
* we only queue up once. So if a task that is supposed to run
* every 30 seconds takes 5 minutes, we start another run
* immediately after 5 minutes, but then return to regular cadence.
*/
if (task->pendingRunCount == 0 &&
TimestampDifferenceExceeds(task->lastStartTime, currentTime,
task->secondsInterval * 1000))
{
task->pendingRunCount += 1;
}
}
}

if (lastMinute == 0)
{
lastMinute = TimestampMinuteStart(currentTime);
Expand Down Expand Up @@ -1051,6 +1073,22 @@ PollForTasks(List *taskList)

if (task->state == CRON_TASK_WAITING && task->pendingRunCount == 0)
{
/*
* Make sure we do not wait past the next run time of an interval
* job.
*/
if (task->secondsInterval > 0)
{
TimestampTz nextRunTime =
TimestampTzPlusMilliseconds(task->lastStartTime,
task->secondsInterval * 1000);

if (TimestampDifferenceExceeds(nextRunTime, nextEventTime, 0))
{
nextEventTime = nextRunTime;
}
}

/* don't poll idle tasks */
continue;
}
Expand Down Expand Up @@ -1121,9 +1159,11 @@ PollForTasks(List *taskList)
pollTimeout = waitSeconds * 1000 + waitMicros / 1000;
if (pollTimeout <= 0)
{
pfree(polledTasks);
pfree(pollFDs);
return;
/*
* Interval jobs might frequently be overdue, inject a small
* 1ms wait to avoid getting into a tight loop.
*/
pollTimeout = 1;
}
else if (pollTimeout > MaxWait)
{
Expand Down Expand Up @@ -1237,6 +1277,8 @@ ManageCronTask(CronTask *task, TimestampTz currentTime)
else
task->state = CRON_TASK_START;

task->lastStartTime = currentTime;

RunningTaskCount++;

/* Add new entry to audit table. */
Expand Down
8 changes: 8 additions & 0 deletions src/task_states.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ RefreshTaskHash(void)

task = GetCronTask(job->jobId);
task->isActive = job->active;
task->secondsInterval = job->schedule.secondsInterval;
}

CronJobCacheValid = true;
Expand All @@ -122,6 +123,13 @@ GetCronTask(int64 jobId)
if (!isPresent)
{
InitializeCronTask(task, jobId);

/*
* We only initialize last run when entering into the hash.
* The net effect is that the timer for the first run of an
* interval job starts when pg_cron first learns about the job.
*/
task->lastStartTime = GetCurrentTimestamp();
}

return task;
Expand Down

0 comments on commit 71642fd

Please sign in to comment.