Skip to content

Commit

Permalink
Add policy_recompression procedure
Browse files Browse the repository at this point in the history
This patch adds a recompress procedure that may be used as custom
job when compression and recompression should run as separate
background jobs.
  • Loading branch information
svenklemm authored and gayyappan committed May 24, 2021
1 parent 426918c commit fe872cb
Show file tree
Hide file tree
Showing 14 changed files with 418 additions and 12 deletions.
4 changes: 4 additions & 0 deletions sql/policy_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression(job_id INTE
AS '@MODULE_PATHNAME@', 'ts_policy_compression_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_recompression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_recompression_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_refresh_continuous_aggregate(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_proc'
LANGUAGE C;
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
CROSSMODULE_WRAPPER(policy_compression_add);
CROSSMODULE_WRAPPER(policy_compression_proc);
CROSSMODULE_WRAPPER(policy_compression_remove);
CROSSMODULE_WRAPPER(policy_recompression_proc);
CROSSMODULE_WRAPPER(policy_refresh_cagg_add);
CROSSMODULE_WRAPPER(policy_refresh_cagg_proc);
CROSSMODULE_WRAPPER(policy_refresh_cagg_remove);
Expand Down Expand Up @@ -309,6 +310,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.policy_compression_add = error_no_default_fn_pg_community,
.policy_compression_proc = error_no_default_fn_pg_community,
.policy_compression_remove = error_no_default_fn_pg_community,
.policy_recompression_proc = error_no_default_fn_pg_community,
.policy_refresh_cagg_add = error_no_default_fn_pg_community,
.policy_refresh_cagg_proc = error_no_default_fn_pg_community,
.policy_refresh_cagg_remove = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef struct CrossModuleFunctions
PGFunction policy_compression_add;
PGFunction policy_compression_proc;
PGFunction policy_compression_remove;
PGFunction policy_recompression_proc;
PGFunction policy_refresh_cagg_add;
PGFunction policy_refresh_cagg_proc;
PGFunction policy_refresh_cagg_remove;
Expand Down
23 changes: 17 additions & 6 deletions src/dimension_slice.c
Original file line number Diff line number Diff line change
Expand Up @@ -931,25 +931,34 @@ ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_
return info.chunk_id;
}

typedef struct CompressChunkSearch
{
int32 chunk_id;
bool compress;
bool recompress;
} CompressChunkSearch;

static ScanTupleResult
dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *data)
{
ListCell *lc;
DimensionSlice *slice = dimension_slice_from_slot(ti->slot);
List *chunk_ids = NIL;
CompressChunkSearch *d = data;

ts_chunk_constraint_scan_by_dimension_slice_to_list(slice, &chunk_ids, CurrentMemoryContext);

foreach (lc, chunk_ids)
{
int32 chunk_id = lfirst_int(lc);
ChunkCompressionStatus st = ts_chunk_get_compression_status(chunk_id);
if (st == CHUNK_COMPRESS_NONE || st == CHUNK_COMPRESS_UNORDERED)
if ((d->compress && st == CHUNK_COMPRESS_NONE) ||
(d->recompress && st == CHUNK_COMPRESS_UNORDERED))
{
/* found a chunk that is not compressed or needs recompress
* caller needs to check the correct chunk status
*/
*((int32 *) data) = chunk_id;
d->chunk_id = chunk_id;
return SCAN_DONE;
}
}
Expand All @@ -960,18 +969,20 @@ dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *dat
int32
ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id, StrategyNumber start_strategy,
int64 start_value, StrategyNumber end_strategy,
int64 end_value)
int64 end_value, bool compress, bool recompress)
{
int32 chunk_id_ret = INVALID_CHUNK_ID;
CompressChunkSearch data = { .compress = compress,
.recompress = recompress,
.chunk_id = INVALID_CHUNK_ID };
dimension_slice_scan_with_strategies(dimension_id,
start_strategy,
start_value,
end_strategy,
end_value,
&chunk_id_ret,
&data,
dimension_slice_check_is_chunk_uncompressed_tuple_found,
-1,
NULL);

return chunk_id_ret;
return data.chunk_id;
}
8 changes: 3 additions & 5 deletions src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,9 @@ extern TSDLLEXPORT int
ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_id,
StrategyNumber start_strategy, int64 start_value,
StrategyNumber end_strategy, int64 end_value);
extern TSDLLEXPORT int32 ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id,
StrategyNumber start_strategy,
int64 start_value,
StrategyNumber end_strategy,
int64 end_value);
extern TSDLLEXPORT int32 ts_dimension_slice_get_chunkid_to_compress(
int32 dimension_id, StrategyNumber start_strategy, int64 start_value,
StrategyNumber end_strategy, int64 end_value, bool compress, bool recompress);
#define dimension_slice_insert(slice) ts_dimension_slice_insert_multi(&(slice), 1)

#define dimension_slice_scan(dimension_id, coordinate, tuplock) \
Expand Down
18 changes: 18 additions & 0 deletions src/jsonb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,24 @@ ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key, bool *field_found)
return DatumGetTimestampTz(time_datum);
}

bool
ts_jsonb_get_bool_field(const Jsonb *json, const char *key, bool *field_found)
{
Datum bool_datum;
char *bool_str = ts_jsonb_get_str_field(json, key);

if (bool_str == NULL)
{
*field_found = false;
return false;
}

bool_datum = DirectFunctionCall1(boolin, CStringGetDatum(bool_str));

*field_found = true;
return DatumGetBool(bool_datum);
}

int32
ts_jsonb_get_int32_field(const Jsonb *json, const char *key, bool *field_found)
{
Expand Down
2 changes: 2 additions & 0 deletions src/jsonb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ extern TSDLLEXPORT char *ts_jsonb_get_str_field(const Jsonb *jsonb, const char *
extern TSDLLEXPORT Interval *ts_jsonb_get_interval_field(const Jsonb *jsonb, const char *key);
extern TSDLLEXPORT TimestampTz ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key,
bool *field_found);
extern TSDLLEXPORT bool ts_jsonb_get_bool_field(const Jsonb *json, const char *key,
bool *field_found);
extern TSDLLEXPORT int32 ts_jsonb_get_int32_field(const Jsonb *json, const char *key,
bool *field_found);
extern TSDLLEXPORT int64 ts_jsonb_get_int64_field(const Jsonb *json, const char *key,
Expand Down
52 changes: 52 additions & 0 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,20 @@
DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("1 hour"), InvalidOid, -1))

#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression"
#define CONFIG_KEY_HYPERTABLE_ID "hypertable_id"
#define CONFIG_KEY_COMPRESS_AFTER "compress_after"
#define CONFIG_KEY_RECOMPRESS_AFTER "recompress_after"
#define CONFIG_KEY_RECOMPRESS "recompress"

bool
policy_compression_get_recompress(const Jsonb *config)
{
bool found;
bool recompress = ts_jsonb_get_bool_field(config, CONFIG_KEY_RECOMPRESS, &found);

return found ? recompress : true;
}

int32
policy_compression_get_hypertable_id(const Jsonb *config)
Expand Down Expand Up @@ -81,6 +93,33 @@ policy_compression_get_compress_after_interval(const Jsonb *config)
return interval;
}

int64
policy_recompression_get_recompress_after_int(const Jsonb *config)
{
bool found;
int64 compress_after = ts_jsonb_get_int64_field(config, CONFIG_KEY_RECOMPRESS_AFTER, &found);

if (!found)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job", CONFIG_KEY_RECOMPRESS_AFTER)));

return compress_after;
}

Interval *
policy_recompression_get_recompress_after_interval(const Jsonb *config)
{
Interval *interval = ts_jsonb_get_interval_field(config, CONFIG_KEY_RECOMPRESS_AFTER);

if (interval == NULL)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job", CONFIG_KEY_RECOMPRESS_AFTER)));

return interval;
}

Datum
policy_compression_proc(PG_FUNCTION_ARGS)
{
Expand All @@ -94,6 +133,19 @@ policy_compression_proc(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}

Datum
policy_recompression_proc(PG_FUNCTION_ARGS)
{
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_VOID();

TS_PREVENT_FUNC_IF_READ_ONLY();

policy_recompression_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));

PG_RETURN_VOID();
}

Datum
policy_compression_add(PG_FUNCTION_ARGS)
{
Expand Down
5 changes: 5 additions & 0 deletions tsl/src/bgw_policy/compression_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ extern Datum policy_compression_add(PG_FUNCTION_ARGS);
extern Datum policy_compression_remove(PG_FUNCTION_ARGS);
extern Datum policy_compression_proc(PG_FUNCTION_ARGS);

extern Datum policy_recompression_proc(PG_FUNCTION_ARGS);

int32 policy_compression_get_hypertable_id(const Jsonb *config);
int64 policy_compression_get_compress_after_int(const Jsonb *config);
Interval *policy_compression_get_compress_after_interval(const Jsonb *config);
bool policy_compression_get_recompress(const Jsonb *config);
int64 policy_recompression_get_recompress_after_int(const Jsonb *config);
Interval *policy_recompression_get_recompress_after_interval(const Jsonb *config);

#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H */
81 changes: 80 additions & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ get_chunk_to_compress(const Dimension *dim, const Jsonb *config)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
StrategyNumber end_strategy = BTLessStrategyNumber;
bool recompress = policy_compression_get_recompress(config);

Datum boundary = get_window_boundary(dim,
config,
Expand All @@ -143,7 +144,30 @@ get_chunk_to_compress(const Dimension *dim, const Jsonb *config)
-1, /*start_value*/
end_strategy,
ts_time_value_to_internal(boundary,
partitioning_type));
partitioning_type),
true,
recompress);
}

static int32
get_chunk_to_recompress(const Dimension *dim, const Jsonb *config)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
StrategyNumber end_strategy = BTLessStrategyNumber;

Datum boundary = get_window_boundary(dim,
config,
policy_recompression_get_recompress_after_int,
policy_recompression_get_recompress_after_interval);

return ts_dimension_slice_get_chunkid_to_compress(dim->fd.id,
InvalidStrategy, /*start_strategy*/
-1, /*start_value*/
end_strategy,
ts_time_value_to_internal(boundary,
partitioning_type),
false,
true);
}

static void
Expand Down Expand Up @@ -551,6 +575,61 @@ policy_compression_read_and_validate_config(Jsonb *config, PolicyCompressionData
}
}

void
policy_recompression_read_and_validate_config(Jsonb *config, PolicyCompressionData *policy_data)
{
Oid table_relid = ts_hypertable_id_to_relid(policy_compression_get_hypertable_id(config));
Cache *hcache;
Hypertable *hypertable =
ts_hypertable_cache_get_cache_and_entry(table_relid, CACHE_FLAG_NONE, &hcache);
if (policy_data)
{
policy_data->hypertable = hypertable;
policy_data->hcache = hcache;
}
}

bool
policy_recompression_execute(int32 job_id, Jsonb *config)
{
int32 chunkid;
Dimension *dim;
PolicyCompressionData policy_data;

policy_recompression_read_and_validate_config(config, &policy_data);
dim = hyperspace_get_open_dimension(policy_data.hypertable->space, 0);
chunkid = get_chunk_to_recompress(dim, config);

if (chunkid == INVALID_CHUNK_ID)
elog(NOTICE,
"no chunks for hypertable \"%s.%s\" that satisfy recompress chunk policy",
policy_data.hypertable->fd.schema_name.data,
policy_data.hypertable->fd.table_name.data);

if (chunkid != INVALID_CHUNK_ID)
{
Chunk *chunk = ts_chunk_get_by_id(chunkid, true);
if (hypertable_is_distributed(policy_data.hypertable))
policy_invoke_recompress_chunk(chunk);
else
tsl_recompress_chunk_wrapper(chunk);

elog(LOG,
"completed recompressing chunk \"%s.%s\"",
NameStr(chunk->fd.schema_name),
NameStr(chunk->fd.table_name));
}

chunkid = get_chunk_to_recompress(dim, config);
if (chunkid != INVALID_CHUNK_ID)
enable_fast_restart(job_id, "recompression");

ts_cache_release(policy_data.hcache);

elog(DEBUG1, "job %d completed recompressing chunk", job_id);
return true;
}

static void
job_execute_function(FuncExpr *funcexpr)
{
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/bgw_policy/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ extern bool policy_reorder_execute(int32 job_id, Jsonb *config);
extern bool policy_retention_execute(int32 job_id, Jsonb *config);
extern bool policy_refresh_cagg_execute(int32 job_id, Jsonb *config);
extern bool policy_compression_execute(int32 job_id, Jsonb *config);
extern bool policy_recompression_execute(int32 job_id, Jsonb *config);
extern void policy_reorder_read_and_validate_config(Jsonb *config, PolicyReorderData *policy_data);
extern void policy_retention_read_and_validate_config(Jsonb *config,
PolicyRetentionData *policy_data);
extern void policy_refresh_cagg_read_and_validate_config(Jsonb *config,
PolicyContinuousAggData *policy_data);
extern void policy_compression_read_and_validate_config(Jsonb *config,
PolicyCompressionData *policy_data);
extern void policy_recompression_read_and_validate_config(Jsonb *config,
PolicyCompressionData *policy_data);
extern bool job_execute(BgwJob *job);

#endif /* TIMESCALEDB_TSL_BGW_POLICY_JOB_H */
1 change: 1 addition & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ CrossModuleFunctions tsl_cm_functions = {
.policy_compression_add = policy_compression_add,
.policy_compression_proc = policy_compression_proc,
.policy_compression_remove = policy_compression_remove,
.policy_recompression_proc = policy_recompression_proc,
.policy_refresh_cagg_add = policy_refresh_cagg_add,
.policy_refresh_cagg_proc = policy_refresh_cagg_proc,
.policy_refresh_cagg_remove = policy_refresh_cagg_remove,
Expand Down

0 comments on commit fe872cb

Please sign in to comment.