-
Notifications
You must be signed in to change notification settings - Fork 92
/
Copy pathpull_data_by_use.R
77 lines (75 loc) · 1.56 KB
/
pull_data_by_use.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
pull_data_by_usi_query <- function(target_usi) {
return (gsub("{target_usi}", target_usi, "
-- Pull what was known at sequence
-- target_usi = {target_usi}
WITH
-- find data rows with freshest
-- update sequence index per fact id
-- with _usi no larger than target_usi
data_scan AS (
SELECT
_fi,
MAX(_usi) AS _usi
FROM
d_data_log
WHERE
_usi <= {target_usi}
GROUP BY
_fi
),
-- find any relevant row deletions
-- with _usi no larger than target_usi
deletion_scan AS (
SELECT
_fi,
MAX(_usi) AS _usi
FROM
d_row_deletions
WHERE
_usi <= {target_usi}
GROUP BY
_fi
),
-- collect state of each row
-- including possibly relevant deletions
chosen_marks AS (
SELECT
data_scan._fi AS _fi,
data_scan._usi AS _usi,
deletion_scan._fi AS _deleted_fi,
deletion_scan._usi AS _deleted_usi
FROM
data_scan
LEFT JOIN
deletion_scan
ON
data_scan._fi = deletion_scan._fi
)
-- Use chosen ids to pull correct rows
-- target_usi = {target_usi}
SELECT
d_data_log.*
FROM
d_data_log
INNER JOIN
chosen_marks
ON
d_data_log._fi = chosen_marks._fi
AND d_data_log._usi = chosen_marks._usi
WHERE
(chosen_marks._deleted_fi is NULL)
OR (chosen_marks._deleted_usi < chosen_marks._usi)
ORDER BY
d_data_log._fi,
d_data_log._usi
", fixed = TRUE))
}
pull_data_by_usi <- function(con, target_usi, return_intenal_keys = FALSE) {
q <- pull_data_by_usi_query(target_usi)
res <- dbGetQuery(con, q)
if (!return_intenal_keys) {
res["_fi"] <- NULL
res["_usi"] <- NULL
}
return(res)
}