Skip to content

Commit

Permalink
Add possibility to schedule jobs by interval in seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoslot committed Feb 5, 2023
1 parent d39e7f5 commit cd6af1b
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 12 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## What is pg_cron?

pg_cron is a simple cron-based job scheduler for PostgreSQL (10 or higher) that runs inside the database as an extension. It uses the same syntax as regular cron, but it allows you to schedule PostgreSQL commands directly from the database:
pg_cron is a simple cron-based job scheduler for PostgreSQL (10 or higher) that runs inside the database as an extension. It uses the same syntax as regular cron, but it allows you to schedule PostgreSQL commands directly from the database. You can also use 'N seconds' to schedule a job every N seconds.

```sql
-- Delete old data on Saturday at 3:30am (GMT)
Expand Down Expand Up @@ -41,6 +41,9 @@ SELECT cron.schedule_in_database('weekly-vacuum', '0 4 * * 0', 'VACUUM', 'some_o
schedule
----------
44

-- Call a stored procedure every 5 seconds
SELECT cron.schedule('process-updates', '5 seconds', 'CALL process_updates()');
```

pg_cron can run multiple jobs in parallel, but it runs at most one instance of a job at a time. If a second run is supposed to start before the first one finishes, then the second run is queued and started as soon as the first run completes.
Expand Down
47 changes: 46 additions & 1 deletion expected/pg_cron-test.out
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ SELECT cron.unschedule(1);
-- Invalid input: input too long
SELECT cron.schedule(repeat('a', 1000), '');
ERROR: invalid schedule: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
-- Invalid input: missing parts
SELECT cron.schedule('* * * *', 'SELECT 1');
ERROR: invalid schedule: * * * *
-- Invalid input: trailing characters
SELECT cron.schedule('60 secondc', 'SELECT 1');
ERROR: invalid schedule: 60 secondc
SELECT cron.schedule('60 seconds c', 'SELECT 1');
ERROR: invalid schedule: 60 seconds c
-- Invalid input: 0 seconds
SELECT cron.schedule('0 seconds', 'SELECT 1');
ERROR: invalid schedule: 0 seconds
-- Try to update pg_cron on restart
SELECT cron.schedule('@restar', 'ALTER EXTENSION pg_cron UPDATE');
ERROR: invalid schedule: @restar
Expand Down Expand Up @@ -200,12 +211,46 @@ CREATE OR REPLACE FUNCTION public.func1(text, current_setting) RETURNS text
CREATE OR REPLACE FUNCTION public.func1(current_setting) RETURNS text
LANGUAGE sql volatile AS 'INSERT INTO test(data) VALUES (current_user); SELECT current_database()::text;';
CREATE CAST (current_setting AS text) WITH FUNCTION public.func1(current_setting) AS IMPLICIT;
CREATE EXTENSION pg_cron VERSION '1.4';
CREATE EXTENSION pg_cron;
select * from public.test;
data
------
(0 rows)

-- valid interval jobs
SELECT cron.schedule('1 second', 'SELECT 1');
schedule
----------
1
(1 row)

SELECT cron.schedule(' 300000 sEcOnDs ', 'SELECT 1');
schedule
----------
2
(1 row)

SELECT cron.schedule('60 seconds', 'SELECT 1');
schedule
----------
3
(1 row)

SELECT cron.schedule('600 seconds ', 'SELECT 1');
schedule
----------
4
(1 row)

SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;
jobid | jobname | schedule | command
-------+---------+------------------+----------
1 | | 1 second | SELECT 1
2 | | 300000 sEcOnDs | SELECT 1
3 | | 60 seconds | SELECT 1
4 | | 600 seconds | SELECT 1
(4 rows)

-- cleaning
DROP EXTENSION pg_cron;
drop user pgcron_cront;
Expand Down
2 changes: 1 addition & 1 deletion include/cron.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ typedef struct _entry {
uid_t uid;
gid_t gid;
char **envp;
char *cmd;
int secondsInterval;
bitstr_t bit_decl(minute, MINUTE_COUNT);
bitstr_t bit_decl(hour, HOUR_COUNT);
bitstr_t bit_decl(dom, DOM_COUNT);
Expand Down
2 changes: 2 additions & 0 deletions include/task_states.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ typedef struct CronTask
PGconn *connection;
PostgresPollingStatusType pollingStatus;
TimestampTz startDeadline;
TimestampTz lastStartTime;
uint32 secondsInterval;
bool isSocketReady;
bool isActive;
char *errorMessage;
Expand Down
19 changes: 18 additions & 1 deletion sql/pg_cron-test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ SELECT cron.unschedule(1);
-- Invalid input: input too long
SELECT cron.schedule(repeat('a', 1000), '');

-- Invalid input: missing parts
SELECT cron.schedule('* * * *', 'SELECT 1');

-- Invalid input: trailing characters
SELECT cron.schedule('60 secondc', 'SELECT 1');
SELECT cron.schedule('60 seconds c', 'SELECT 1');

-- Invalid input: 0 seconds
SELECT cron.schedule('0 seconds', 'SELECT 1');

-- Try to update pg_cron on restart
SELECT cron.schedule('@restar', 'ALTER EXTENSION pg_cron UPDATE');
SELECT cron.schedule('@restart', 'ALTER EXTENSION pg_cron UPDATE');
Expand Down Expand Up @@ -117,9 +127,16 @@ CREATE OR REPLACE FUNCTION public.func1(current_setting) RETURNS text

CREATE CAST (current_setting AS text) WITH FUNCTION public.func1(current_setting) AS IMPLICIT;

CREATE EXTENSION pg_cron VERSION '1.4';
CREATE EXTENSION pg_cron;
select * from public.test;

-- valid interval jobs
SELECT cron.schedule('1 second', 'SELECT 1');
SELECT cron.schedule(' 300000 sEcOnDs ', 'SELECT 1');
SELECT cron.schedule('60 seconds', 'SELECT 1');
SELECT cron.schedule('600 seconds ', 'SELECT 1');
SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;

-- cleaning
DROP EXTENSION pg_cron;
drop user pgcron_cront;
Expand Down
2 changes: 0 additions & 2 deletions src/entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ static int set_element(bitstr_t *, int, int, int);
void
free_entry(entry *e)
{
if (e->cmd)
free(e->cmd);
free(e);
}

Expand Down
63 changes: 60 additions & 3 deletions src/job_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/formatting.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
Expand Down Expand Up @@ -88,6 +89,9 @@ static void AlterJob(int64 jobId, text *scheduleText, text *commandText,
text *databaseText, text *usernameText, bool *active);

static Oid GetRoleOidIfCanLogin(char *username);
static entry * ParseSchedule(char *scheduleText);
static bool TryParseInterval(char *scheduleText, uint32 *secondsInterval);


/* SQL-callable functions */
PG_FUNCTION_INFO_V1(cron_schedule);
Expand Down Expand Up @@ -215,7 +219,7 @@ ScheduleCronJob(text *scheduleText, text *commandText, text *databaseText,

/* check schedule is valid */
schedule = text_to_cstring(scheduleText);
parsedSchedule = parse_cron_entry(schedule);
parsedSchedule = ParseSchedule(schedule);

if (parsedSchedule == NULL)
{
Expand Down Expand Up @@ -977,7 +981,7 @@ TupleToCronJob(TupleDesc tupleDescriptor, HeapTuple heapTuple)
}
}

parsedSchedule = parse_cron_entry(job->scheduleText);
parsedSchedule = ParseSchedule(job->scheduleText);
if (parsedSchedule != NULL)
{
/* copy the schedule and free the allocated memory immediately */
Expand Down Expand Up @@ -1290,7 +1294,7 @@ AlterJob(int64 jobId, text *scheduleText, text *commandText, text *databaseText,
if (scheduleText != NULL)
{
schedule = text_to_cstring(scheduleText);
parsedSchedule = parse_cron_entry(schedule);
parsedSchedule = ParseSchedule(schedule);

if (parsedSchedule == NULL)
{
Expand Down Expand Up @@ -1475,3 +1479,56 @@ JobTableExists(void)

return jobTableOid != InvalidOid;
}


/*
* ParseSchedule attempts to parse a cron schedule or an interval in seconds.
* The returned pointer is allocated using malloc and should be freed by the
* caller.
*/
static entry *
ParseSchedule(char *scheduleText)
{
uint32 secondsInterval = 0;

/*
* Parse as interval on seconds or fall back to trying cron schedule.
*/
if (TryParseInterval(scheduleText, &secondsInterval))
{
entry *schedule = calloc(sizeof(entry), sizeof(char));
schedule->secondsInterval = secondsInterval;
return schedule;
}

return parse_cron_entry(scheduleText);
}


/*
* TryParseInterval returns whether scheduleText is of the form
* <positive number> second[s].
*/
static bool
TryParseInterval(char *scheduleText, uint32 *secondsInterval)
{
char plural = '\0';
char extra = '\0';
char *lowercaseSchedule = asc_tolower(scheduleText, strlen(scheduleText));

int numParts = sscanf(lowercaseSchedule, " %u second%c %c", secondsInterval,
&plural, &extra);

if (numParts == 1)
{
/* <number> second (allow "2 second") */
return *secondsInterval > 0;
}
else if (numParts == 2 && plural == 's')
{
/* <number> seconds (allow "1 seconds") */
return *secondsInterval > 0;
}

return false;
}
47 changes: 44 additions & 3 deletions src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,27 @@ StartAllPendingRuns(List *taskList, TimestampTz currentTime)
RebootJobsScheduled = true;
}

foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);

if (task->secondsInterval > 0 && task->isActive)
{
/*
* For interval jobs, if a task takes longer than the interval,
* we only queue up once. So if a task that is supposed to run
* every 30 seconds takes 5 minutes, we start another run
* immediately after 5 minutes, but then return to regular cadence.
*/
if (task->pendingRunCount == 0 &&
TimestampDifferenceExceeds(task->lastStartTime, currentTime,
task->secondsInterval * 1000))
{
task->pendingRunCount += 1;
}
}
}

if (lastMinute == 0)
{
lastMinute = TimestampMinuteStart(currentTime);
Expand Down Expand Up @@ -1052,6 +1073,22 @@ PollForTasks(List *taskList)

if (task->state == CRON_TASK_WAITING && task->pendingRunCount == 0)
{
/*
* Make sure we do not wait past the next run time of an interval
* job.
*/
if (task->secondsInterval > 0)
{
TimestampTz nextRunTime =
TimestampTzPlusMilliseconds(task->lastStartTime,
task->secondsInterval * 1000);

if (TimestampDifferenceExceeds(nextRunTime, nextEventTime, 0))
{
nextEventTime = nextRunTime;
}
}

/* don't poll idle tasks */
continue;
}
Expand Down Expand Up @@ -1122,9 +1159,11 @@ PollForTasks(List *taskList)
pollTimeout = waitSeconds * 1000 + waitMicros / 1000;
if (pollTimeout <= 0)
{
pfree(polledTasks);
pfree(pollFDs);
return;
/*
* Interval jobs might frequently be overdue, inject a small
* 1ms wait to avoid getting into a tight loop.
*/
pollTimeout = 1;
}
else if (pollTimeout > MaxWait)
{
Expand Down Expand Up @@ -1238,6 +1277,8 @@ ManageCronTask(CronTask *task, TimestampTz currentTime)
else
task->state = CRON_TASK_START;

task->lastStartTime = currentTime;

RunningTaskCount++;

/* Add new entry to audit table. */
Expand Down
8 changes: 8 additions & 0 deletions src/task_states.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ RefreshTaskHash(void)

task = GetCronTask(job->jobId);
task->isActive = job->active;
task->secondsInterval = job->schedule.secondsInterval;
}

CronJobCacheValid = true;
Expand All @@ -122,6 +123,13 @@ GetCronTask(int64 jobId)
if (!isPresent)
{
InitializeCronTask(task, jobId);

/*
* We only initialize last run when entering into the hash.
* The net effect is that the timer for the first run of an
* interval job starts when pg_cron first learns about the job.
*/
task->lastStartTime = GetCurrentTimestamp();
}

return task;
Expand Down

0 comments on commit cd6af1b

Please sign in to comment.