-
Notifications
You must be signed in to change notification settings - Fork 3
/
{omop_table}_merge.sql.jinja
174 lines (173 loc) · 7.21 KB
/
{omop_table}_merge.sql.jinja
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
{#- Copyright 2024 RADar-AZDelta -#}
{#- SPDX-License-Identifier: gpl3+ -#}
{%- if events.keys()|length > 0 %}
{#- MERGE INTO `{{dataset_work}}.{{omop_table}}` AS T -#}
CREATE OR REPLACE TABLE `{{dataset_work}}.{{omop_table}}`
LIKE `{{dataset_work}}.{{omop_table}}`
{%- else %}
CREATE OR REPLACE TABLE `{{dataset_omop}}.{{omop_table}}`
LIKE `{{dataset_omop}}.{{omop_table}}`
{%- endif %}
{#- USING ( -#}
AS (
WITH cte_uploaded_tables AS (
{%- for upload_table in upload_tables -%}
{%- if not loop.first %}
UNION ALL
{%- endif %}
SELECT *
FROM `{{dataset_work}}.{{omop_table}}__upload__{{upload_table}}`
{%- endfor %}
), cte_keys_swapped AS (
SELECT
{%- set ns = namespace(fk_counter=0, ci_counter=0) -%}
{%- for column in columns -%}
{%- if not loop.first -%}
{{','}}
{%- endif %}
{% if column == primary_key_column -%}
{%- if pk_auto_numbering -%}
swap_pk.y as `{{column}}`
{%- else -%}
t.`{{column}}` as `{{column}}`
{%- endif -%}
{%- elif column in foreign_key_columns -%}
{%- if column in required_columns -%}
IFNULL(swap_fk{{ns.fk_counter}}.y, 0) as `{{column}}`
{%- else -%}
swap_fk{{ns.fk_counter}}.y as `{{column}}`
{%- endif -%}
{%- set ns.fk_counter = ns.fk_counter + 1 -%}
{%- elif column in concept_id_columns -%}
{%- if not column in events.values() -%}
IFNULL(swap_ci{{ns.ci_counter}}.conceptId, 0) as `{{column}}`
{%- set ns.ci_counter = ns.ci_counter + 1 -%}
{%- else -%}
t.`{{column}}` as `{{column}}`
{%- endif -%}
{%- elif column.endswith("_source_value") and pk_auto_numbering -%}
STRING_AGG(t.`{{column}}`) over (
partition by
swap_pk.y
{%- for column in concept_id_columns %}
{%- if not column in events.values() %}
, swap_pk.`{{column}}`
{%- endif -%}
{%- endfor %}
) as `{{column}}`
{%- else -%}
t.`{{column}}`
{%- endif %}
{%- endfor %}
FROM cte_uploaded_tables t
{%- set ns = namespace(ci_counter=0) -%}
{%- for column in concept_id_columns %}
{%- if not column in events.values() %}
LEFT OUTER JOIN `{{dataset_work}}.{{omop_table}}__{{column.lower()}}_usagi` swap_ci{{ns.ci_counter}} on swap_ci{{ns.ci_counter}}.sourceCode = t.`{{column}}`
{% if not process_semi_approved_mappings -%}
and swap_ci{{ns.ci_counter}}.mappingStatus = 'APPROVED'
{%- else -%}
and swap_ci{{ns.ci_counter}}.mappingStatus in ('APPROVED', 'SEMI-APPROVED')
{%- endif -%}
{%- set ns.ci_counter = ns.ci_counter + 1 -%}
{%- endif -%}
{%- endfor %}
{%- if pk_auto_numbering %}
INNER JOIN `{{dataset_work}}.{{primary_key_column}}_swap` swap_pk on swap_pk.x = t.{{primary_key_column}}
{%- set ns = namespace(ci_counter=0) -%}
{%- for column in concept_id_columns %}
{%- if not column in events.values() %}
and IFNULL(swap_pk.`{{column}}`, 0) = IFNULL(swap_ci{{ns.ci_counter}}.conceptId, 0)
{%- else %}
and IFNULL(swap_pk.`{{column}}`, '') = IFNULL(t.`{{column}}`, '')
{%- endif -%}
{%- set ns.ci_counter = ns.ci_counter + 1 -%}
{%- endfor %}
{%- for column in events %}
and IFNULL(swap_pk.`{{column}}`, '') = IFNULL(t.`{{column}}`, '')
{%- endfor %}
{%- endif -%}
{%- set ns = namespace(fk_counter=0) -%}
{%- for column in foreign_key_columns %}
{%- if column == 'preceding_visit_occurrence_id' %}
LEFT OUTER JOIN `{{dataset_work}}.visit_occurrence_id_swap` swap_fk{{ns.fk_counter}} on swap_fk{{ns.fk_counter}}.x = t.`{{column}}`
{%- elif column == 'preceding_visit_detail_id' %}
LEFT OUTER JOIN `{{dataset_work}}.visit_detail_id_swap` swap_fk{{ns.fk_counter}} on swap_fk{{ns.fk_counter}}.x = t.`{{column}}`
{%- elif column == 'parent_visit_detail_id' %}
LEFT OUTER JOIN `{{dataset_work}}.visit_detail_id_swap` swap_fk{{ns.fk_counter}} on swap_fk{{ns.fk_counter}}.x = t.`{{column}}`
{%- else %}
LEFT OUTER JOIN `{{dataset_work}}.{{column}}_swap` swap_fk{{ns.fk_counter}} on swap_fk{{ns.fk_counter}}.x = t.`{{column}}`
{%- endif -%}
{%- set ns.fk_counter = ns.fk_counter + 1 -%}
{%- endfor %}
), cte_duplicates AS (
SELECT *
, ROW_NUMBER() OVER(
PARTITION BY
{%- if omop_table == 'fact_relationship' %}
fact_id_1
, fact_id_2
{%- elif omop_table == 'death' %}
person_id
{%- elif omop_table == 'cdm_source' %}
cdm_source_name
{%- elif primary_key_column %}
{{primary_key_column}}
{%- endif -%}
{%- for column in concept_id_columns -%}
{% if not column in events.values() %}
, `{{column}}`
{%- endif -%}
{%- endfor -%}
{%- for column in events %}
{%- if omop_table != 'fact_relationship' %}
, `{{column}}`
{%- endif -%}
{%- endfor %}
) AS rn
FROM cte_keys_swapped
)
SELECT * EXCEPT(rn)
FROM cte_duplicates
WHERE rn = 1
{%- if omop_table == "vocabulary" %}
UNION ALL
SELECT *
FROM `{{dataset_omop}}.vocabulary`
WHERE vocabulary_concept_id < {{min_custom_concept_id}}
{%- endif %}
)
{#- ) AS S
{% if omop_table == 'fact_relationship' -%}
ON S.fact_id_1 = T.fact_id_1 and S.fact_id_2 = T.fact_id_2
{%- elif omop_table == 'death' -%}
ON S.person_id = T.person_id
{%- elif omop_table == 'cdm_source' -%}
ON S.cdm_source_name = T.cdm_source_name
{%- elif primary_key_column -%}
ON S.{{primary_key_column}} = T.{{primary_key_column}}
{%- else -%}
ON TRUE = TRUE
{%- endif %}
{%- for column in concept_id_columns %}
{% if not column in events.values() -%}
and S.`{{column}}` = T.`{{column}}`
{%- else -%}
and coalesce(S.`{{column}}`, '') = coalesce(T.`{{column}}`, '')
{%- endif -%}
{%- endfor %}
{%- for column in events %}
{% if omop_table != 'fact_relationship' -%}
and coalesce(S.`{{column}}`, '') = coalesce(T.`{{column}}`, '')
{%- endif %}
{%- endfor %}
WHEN MATCHED THEN
UPDATE SET
{%- for column in columns if (column != primary_key_column and omop_table != "death") or (omop_table == "death" and column != "person_id") -%}
{% if not loop.first -%}
{{','}}
{%- endif %}
`{{column}}` = S.`{{column}}`
{%- endfor %}
WHEN NOT MATCHED THEN
INSERT ROW -#}