Skip to content

Commit

Permalink
Parse body
Browse files Browse the repository at this point in the history
  • Loading branch information
erimatnor committed Aug 30, 2023
1 parent 11cfba7 commit 6fce237
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 10 deletions.
25 changes: 16 additions & 9 deletions .github/workflows/issue-labeler.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
name: Label issues

"on":
issues:
types:
Expand All @@ -10,21 +9,29 @@ name: Label issues
# filter out pull requests
jobs:
label_issues:
name: Add labels from template response
if: ${{ github.event.issue.action == 'opened' }}
runs-on: ubuntu-latest
permissions:
issues: write
issues: write
steps:
- uses: actions/setup-node@v3
with:
node-version: 16
- run: npm install marked
- uses: actions/github-script@v6
env:
ISSUE_BODY: ${{ github.event.issue.body }}
with:
script: |
const { ISSUE_BODY } = process.env
console.log(`BODY: ${ISSUE_BODY}`)
//github.rest.issues.addLabels({
// issue_number: context.issue.number,
// owner: context.repo.owner,
// repo: context.repo.repo,
// labels: ["triage"]
//})
const re = /[#]{1,3} .*\n\n/;
const labels = body.split(re);
github.rest.issues.addLabels({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
labels: [ labels[1], labels[2], "installation:" + labels[7], "pgversion:" + labels[5], "tsversion:" + labels[4] ]
})
210 changes: 209 additions & 1 deletion tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include "ts_catalog/catalog.h"
#include <postgres.h>
#include <executor/spi.h>
#include <fmgr.h>
#include <lib/stringinfo.h>
#include <storage/lockdefs.h>
#include <utils/builtins.h>
#include <utils/rel.h>
#include <utils/relcache.h>
#include <utils/date.h>
#include <utils/snapmgr.h>
#include <storage/lmgr.h>
#include <commands/matview.h>
#include <tcop/tcopprot.h>
#include <access/multixact.h>

#include <scanner.h>
#include <compat/compat.h>
Expand All @@ -20,7 +26,8 @@
#include "ts_catalog/continuous_aggs_watermark.h"
#include <time_utils.h>
#include "debug_assert.h"

#include <hypercube.h>
#include <tcop/tcopprot.h>
#include "materialize.h"

#define CHUNKIDFROMRELID "chunk_id_from_relid"
Expand Down Expand Up @@ -51,6 +58,207 @@ static void spi_insert_materializations(Hypertable *mat_ht, SchemaAndName partia
TimeRange materialization_range,
const char *const chunk_condition);

static uint64
refresh_matview_datafill(DestReceiver *dest, Query *query,
const char *queryString)
{
List *rewritten;
PlannedStmt *plan;
QueryDesc *queryDesc;
Query *copied_query;
uint64 processed;

/* Lock and rewrite, using a copy to preserve the original query. */
copied_query = copyObject(query);
AcquireRewriteLocks(copied_query, true, false);
rewritten = QueryRewrite(copied_query);

/* SELECT should never rewrite to more or less than one SELECT query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
query = (Query *) linitial(rewritten);

/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS();

/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);

/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
* the planner executed an allegedly-stable function that changed the
* database contents, but let's do it anyway to be safe.)
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();

/* Create a QueryDesc, redirecting output to our tuple receiver */
queryDesc = CreateQueryDesc(plan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
dest, NULL, NULL, 0);

/* call ExecutorStart to prepare the plan for execution */
ExecutorStart(queryDesc, 0);

/* run the plan */
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);

processed = queryDesc->estate->es_processed;

/* and clean up */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);

FreeQueryDesc(queryDesc);

PopActiveSnapshot();

return processed;
}

void
continuous_agg_materialize(const Hypertable *mat_ht,
const Chunk *chunk,
SchemaAndName direct_view,
const NameData *time_column_name)
{
Oid OIDNewHeap;
uint64 processed = 0;
Oid tablespace = get_rel_tablespace(chunk->table_id);
Relation chunkrel;
Oid relowner;
char relpersistence;
Oid save_userid;
int save_sec_context;
DestReceiver *dest;
RewriteRule *rule;
List *actions;
Query *dataQuery;
Oid query_view_namespaceid = get_namespace_oid(NameStr(*direct_view.schema), false);
Oid query_view_relid = get_relname_relid(NameStr(*direct_view.name), query_view_namespaceid);
Relation query_view_rel;
int save_nestlevel;
StringInfo command = makeStringInfo();
Oid out_fn;
bool type_is_varlena;
const Dimension *dim = hyperspace_get_open_dimension(mat_ht->space, 0);
const DimensionSlice *slice = ts_hypercube_get_slice_by_dimension_id(chunk->cube, dim->fd.id);
char *materialization_start;
char *materialization_end;

getTypeOutputInfo(dim->fd.column_type, &out_fn, &type_is_varlena);
materialization_start = OidOutputFunctionCall(out_fn, slice->fd.range_start);
materialization_end = OidOutputFunctionCall(out_fn, slice->fd.range_end);

appendStringInfo(command,
"SELECT * FROM %s.%s AS I "
"WHERE I.%s >= %s AND I.%s < %s;",
quote_identifier(NameStr(*direct_view.schema)),
quote_identifier(NameStr(*direct_view.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end));


List *parsetree_list = pg_parse_query(command->data);
RawStmt *parsetree = linitial(parsetree_list);

List *raw_parsetree_list = raw_parser(command->data, 0);

List *querytree_list = pg_analyze_and_rewrite_fixedparams(parsetree, command->data,
NULL, 0, NULL);

query_view_rel = relation_open(query_view_relid, AccessShareLock);

chunkrel = table_open(chunk->table_id, NoLock);
relowner = chunkrel->rd_rel->relowner;
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_RESTRICTED_OPERATION);

relpersistence = chunkrel->rd_rel->relpersistence;
save_nestlevel = NewGUCNestLevel();

/*
* Create the transient table that will receive the regenerated data. Lock
* it against access by any other process until commit (by which time it
* will be gone).
*/
OIDNewHeap = make_new_heap(chunk->table_id, tablespace,
relpersistence, ExclusiveLock);

LockRelationOid(OIDNewHeap, AccessExclusiveLock);

dest = CreateTransientRelDestReceiver(OIDNewHeap);

rule = query_view_rel->rd_rules->rules[0];

if (rule->event != CMD_SELECT || !(rule->isInstead))
elog(ERROR,
"the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
RelationGetRelationName(query_view_rel));

actions = rule->actions;
if (list_length(actions) != 1)
elog(ERROR,
"the rule for materialized view \"%s\" is not a single action",
RelationGetRelationName(query_view_rel));

/*
* The stored query was rewritten at the time of the MV definition, but
* has not been scribbled on by the planner.
*/
dataQuery = linitial_node(Query, actions);

/* Generate the data, if wanted. */


processed = refresh_matview_datafill(dest, dataQuery, command->data);

/* Make the matview match the newly generated data. */

{
finish_heap_swap(chunk_relid, OIDNewHeap, false, false, true, true,
RecentXmin, ReadNextMultiXactId(), relpersistence);


/*
* Inform cumulative stats system about our activity: basically, we
* truncated the matview and inserted some new data. (The concurrent
* code path above doesn't need to worry about this because the
* inserts and deletes it issues get counted by lower-level code.)
*/
pgstat_count_truncate(query_view_rel);

pgstat_count_heap_insert(chunkrel, processed);
}

table_close(chunkrel, NoLock);
relation_close(query_view_rel, NoLock);

/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);

/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);

//ObjectAddressSet(address, RelationRelationId, matviewOid);

/*
* Save the rowcount so that pg_stat_statements can track the total number
* of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
* still don't display the rowcount in the command completion tag output,
* i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
* command tag is left false in cmdtaglist.h. Otherwise, the change of
* completion tag output might break applications using it.
*/
//if (qc)
// SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);

}

void
continuous_agg_update_materialization(Hypertable *mat_ht, SchemaAndName partial_view,
SchemaAndName materialization_table,
Expand Down

0 comments on commit 6fce237

Please sign in to comment.