-
Notifications
You must be signed in to change notification settings - Fork 14
/
schema.sql
132 lines (120 loc) · 3.03 KB
/
schema.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
create schema if not exists public;
create schema if not exists shovel;
create table if not exists shovel.integrations (
name text,
conf jsonb
);
create table if not exists shovel.ig_updates (
name text not null,
src_name text not null,
backfill boolean default false,
num numeric not null,
latency interval,
nrows numeric,
stop numeric
);
create table if not exists shovel.sources (
name text,
chain_id integer,
url text
);
create table if not exists shovel.task_updates (
num numeric,
hash bytea,
insert_at timestamptz default now(),
src_hash bytea,
src_num numeric,
nblocks numeric,
nrows numeric,
latency interval,
backfill boolean default false,
src_name text,
stop numeric
);
create unique index
if not exists intg_name_src_name_backfill_num_idx
on shovel.ig_updates
using btree (name, src_name, backfill, num desc);
create unique index
if not exists sources_name_chain_id_idx
on shovel.sources
using btree (name, chain_id);
create unique index
if not exists sources_name_idx
on shovel.sources
using btree (name);
-- Changes:
alter table shovel.task_updates
add column if not exists chain_id int;
alter table shovel.task_updates
add column if not exists ig_name text;
drop index if exists shovel.task_src_name_num_idx;
drop index if exists shovel.task_src_name_num_idx1;
do $$
begin
if exists (
select 1
from information_schema.columns
where table_schema = 'shovel'
and table_name = 'task_updates'
and column_name = 'backfill'
) then
with tasks as (
select distinct on (src_name)
src_name, num, hash, chain_id
from shovel.task_updates
where backfill = false
order by src_name, num desc
), igs as (
select
distinct on (i.src_name, i.name)
t.chain_id,
i.src_name,
i.name,
i.num,
t.hash
from shovel.ig_updates i
left outer join tasks t
on t.src_name = i.src_name
and t.num = i.num
where i.backfill = false
order by i.src_name, i.name, i.num desc
) insert into shovel.task_updates (
chain_id,
src_name,
ig_name,
num,
hash
) select chain_id, src_name, name, num, hash from igs;
delete from shovel.task_updates where ig_name is null;
end if;
end
$$;
alter table shovel.task_updates
drop column if exists backfill;
create unique index
if not exists task_src_name_num_idx
on shovel.task_updates
using btree (ig_name, src_name, num DESC);
drop view if exists shovel.source_updates;
create or replace view shovel.source_updates as
select distinct on (src_name)
src_name, num, hash, src_num, src_hash, nblocks, nrows, latency
from shovel.task_updates
order by src_name, num desc;
drop view if exists shovel.latest;
create or replace view shovel.latest as
with abs_latest as (
select src_name, max(num) num
from shovel.task_updates
group by src_name
), src_latest as (
select
shovel.task_updates.src_name,
max(shovel.task_updates.num) num
from shovel.task_updates, abs_latest
where shovel.task_updates.src_name = abs_latest.src_name
and abs_latest.num - shovel.task_updates.num <= 10
group by shovel.task_updates.src_name, shovel.task_updates.ig_name
)
select src_name, min(num) num from src_latest group by 1;