Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
278 lines (264 sloc) 15.7 KB
{# --============================================================================== -- #}
{# -- -- #}
{# -- DBMS Name : SQL Server -- #}
{# -- Template Name : nu3_sqlserver_procedure_stage -- #}
{# -- Description : This template loads data into a stage table to be used -- #}
{# -- by many destination table types (multi-purpose template) -- #}
{# -- -- #}
{# --============================================================================== -- #}
{# -- -- #}
{# -- History -- #}
{# -- 20170628 | Eckhard Zemp | Initial Creation -- #}
{# -- 20170711 | Eckhard Zemp | Redesign -- #}
{# -- 20170717 | Eckhard Zemp | Rename -- #}
{# -- 20170814 | Eckhard Zemp | Hash Update -- #}
{# -- 20171102 | Eckhard Zemp | Adding Clean HashBK subquery -- #}
{# -- 20171102 | Eckhard Zemp | Adding Dimension Lookup -- #}
{# -- 20171108 | Eckhard Zemp | Beautify multiple lines with getLines -- #}
{# -- 20171212 | Eckhard Zemp | Finally found a way to reuse the stage block -- #}
{# -- 20171219 | Eckhard Zemp | Added delta loading option -- #}
{# -- 20180102 | Eckhard Zemp | Merge stage fact features into here -- #}
{# -- 20180105 | Eckhard Zemp | Verify Meta Data -- #}
{# -- 20180207 | Eckhard Zemp | Dim Lookup WITH(NOLOCK) and update with BK -- #}
{# -- -- #}
{# --============================================================================== -- #}
{#- Load utility macros #}
{%- import "nu3_sqlserver_utility_configuration" -%}
{%- import "nu3_sqlserver_utility_elements" -%}
{%- import "nu3_sqlserver_utility_snippets" -%}
{%- import "nu3_sqlserver_utility_procedure" -%}
{%- import "nu3_sqlserver_utility_metadata" -%}
{%- import "nu3_utility_metadata" -%}
{#- Variables #}
{%- set templateVersion = "20180105" -%}
{%- set sysObjectType = "TABLE" -%}
{%- set empty = "" -%}
{%- set TempTable = "#" + table.name -%}
{%- set dssLoadDate = dssLoadDate() | trim -%}
{%- set settingStageCondense = settingStageCondense() | trim %}
{%- set settingStageCheckMetaData = settingStageCheckMetaData() | trim %}
{%- set targetObjectName = "" -%}
{%- if table.extendedPropertyValuesByName.target_object_name is defined -%}
{%- set list = table.extendedPropertyValuesByName.target_object_name -%}
{%- for item in (list | lines) -%}
{%- if item | trim != "" -%}{%- set targetObjectName = item | trim -%}{%- endif %}
{%- endfor %}
{%- endif -%}
{%- set hasBK = false -%}
{%- from table.columns as column where column.businessKey -%}
{%- set hasBK = true -%}
{%- endfrom -%}
{%- set hasHashKey = false -%}
{%- from table.columns as column where (column.hubHashKey or column.linkHashKey or column.changeHashKey) and column.hashKeySources is defined and (table.sourceJoinDetails.where | trim != "" or targetObjectName != "") -%}
{%- set hasHashKey = true -%}
{%- endfrom -%}
{#- Properties #}
{%- if table.sourceJoinDetails.distinct is defined %}{%- endif %}
{%- if table.sourceJoinDetails.join is defined %}{%- endif %}
{%- if table.sourceJoinDetails.where is defined %}{%- endif %}
{%- if table.sourceJoinDetails.groupBy is defined %}{%- endif %}
{#- Procedure Header #}
{{- addProcedureDocumentation(templateVersion) }}
{{- addProcedureHeader() }}
{#- Metadata #}
{%- if settingStageCheckMetaData == "true" -%}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Metadata Check") }}
{{- checkMetadata(indent = " ") }}
{{- checkMetadataSqlServer(indent = " ") }}
{% br %}
{%- endif %}
{#- Main #}
{{- addProcedureCommentBlock(commentMessage = "MAIN") }}
SET @v_step = {% counter %}00{% br %}{% br %}
SET @v_insert_count = 0{% br %}
SET @v_current_datetime = {{ currentDateTimeFormula() }}{% br %}{% br %}
BEGIN TRY{% br %}{% br %}
{#- Source #}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Source") }}
{{- addDetailMessage(indent = " ",message = "Load data from " + getSourceTableString(settings, length = 65)) }}
{% br %}
{#- Delete old records #}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Delete existing records") }}
SET @v_step = {% counter %}00{% br %}{% br %}
SET @v_sql = N'TRUNCATE TABLE [TABLEOWNER].[{{ table.name }}];'{% br %}
EXEC @v_return_status = sp_executesql @v_sql{% br %}{% br %}
{#- Create temporary table only if we have to calculate a hash key #}
{%- if hasHashKey == true -%}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Temporary Table for Hash Calculation") }}
SET @v_step = {% counter %}00{% br %}{% br %}
SET @v_row_count = 0{% br %}{% br %}
IF OBJECT_ID('tempdb..{{ TempTable }}') IS NOT NULL
DROP TABLE {{ TempTable }}{% br %}{% br %}
SELECT *{% br %}
INTO {{ TempTable }}{% br %}
FROM ({% br %}
SELECT *{% br %}
FROM ({% br %}
SELECT {% br %}
{{- addSelectColumnsExtract(indent = " ") }}
{%- if table.sourceJoinDetails.join | trim != "" %} {{ getLines(indent=" ",object = table.sourceJoinDetails.join) }}{% br %}{%- endif %}
{%- if table.sourceJoinDetails.where | trim != "" %} {{ getLines(indent=" ",object = table.sourceJoinDetails.where) }}{% br %}{%- endif %}
{%- if table.sourceJoinDetails.groupBy | trim != "" %} {{ getLines(indent=" ",object = table.sourceJoinDetails.groupBy) }}{% br %}{%- endif %}
) AS {{ table.name }}{% br %}
{{- addWhereDssLoadDateMax(indent = " ")}}
) AS {{ table.name }}{% br %}
;{% br %}{% br %}
SELECT @v_row_count = @@ROWCOUNT{% br %}
{{- addDetailMessage(indent = " ",message = TempTable + " updated. ' + CONVERT(VARCHAR,@v_row_count) + ' records added.") }}
{% br %}
{%- endif %}
{#- Insert new records #}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Insert new records") }}
SET @v_step = {% counter %}00{% br %}{% br %}
BEGIN TRANSACTION{% br %}{% br %}
INSERT INTO [TABLEOWNER].[{{ table.name }}] WITH ( TABLOCKX ){% br %}
({% br %}
{{- addInsertColumns(indent = " ") }}
){% br %}
{%- include "nu3_sqlserver_block_select_stage" with {"indent":" ","mergeTable":""} %}
;{% br %}{% br %}
SELECT @v_row_count = @@ROWCOUNT{% br %}{% br %}
COMMIT{% br %}{% br %}
SET @v_insert_count = @v_insert_count + @v_row_count{% br %}
{{- addDetailMessage(indent = " ",message = table.name + " updated. ' + CONVERT(VARCHAR,@v_row_count) + ' records added.") }}
{% br %}
{#- Insert Dimension Keys #}
{%- from table.relations as rel %}
{%- if loop.first %}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Insert & Update dimension keys for failed lookup") }}
{%- endif %}
{%- for fk in rel.fks %}
{%- if loop.first %}
{%- set insert_missing_bk = true -%}
{%- set newLine = "
" %}
{%- fetch fk.toTable %}
{%- if fk.toTable.extendedPropertyValuesByName.insert_missing_bk is defined -%}
{%- set list = fk.toTable.extendedPropertyValuesByName.insert_missing_bk -%}
{%- for item in (list | lines ) -%}
{%- if item | trim | lower == "false" -%}{%- set insert_missing_bk = false -%}{%- endif %}
{%- endfor %}
{%- endif %}
{%- set zeroKeyId = zeroKeyId() | trim -%}
{%- if fk.toTable.extendedPropertyValuesByName.zero_key_id is defined -%}
{%- set list = fk.toTable.extendedPropertyValuesByName.zero_key_id -%}
{%- for item in (list | lines) -%}
{%- if item | trim != "" -%}{%- set zeroKeyId = item | trim -%}{%- endif %}
{%- endfor %}
{%- endif -%}
{%- if insert_missing_bk == true %}
{%- for fk in rel.fks %}
{%- if loop.first %}
{%- fetch fk.toTable %}
-- {{ fk.toTable.name }}{% br %}
BEGIN TRANSACTION{% br %}{% br %}
INSERT INTO [TABLEOWNER].[{{ fk.toTable.name }}] WITH ( TABLOCKX ){% br %}
{%- endif %}
{%- endfor %}
({% br %}
{%- for fk in rel.fks %}
{% if loop.first %} {% else %},{% endif %}
{%- fetch fk.toColumn %}
{{- fk.toColumn.name }}{% br %}
{%- endfor %}
{%- for fk in rel.fks %}
{%- fetch fk.toTable %}
{%- if loop.first %}
{%- from fk.toTable.columns as column where column.createTime or column.updateTime or column.name == dssLoadDate or column.changeHashKey %}
,{{ column.name }}{% br %}
{%- endfrom %}
{%- endif %}
{%- endfor %}
){% br %}
SELECT DISTINCT{% br %}
{%- for fk in rel.fks %}
{% if loop.first %} {% else %},{% endif %}
{%- fetch fk.fromColumn %}
{{- fk.fromColumn.name }}{% br %}
{%- endfor %}
{%- for fk in rel.fks %}
{%- fetch fk.toTable %}
{%- if loop.first %}
{%- from fk.toTable.columns as column where column.createTime or column.updateTime or column.name == dssLoadDate or column.changeHashKey %}
,{%- if column.updateTime or column.createTime -%}
@v_current_datetime
{%- elseif column.name == dssLoadDate -%}
'0001-01-01'
{%- elseif column.changeHashKey -%}
{{ dssChangeHashDefaultValue() }}
{%- else -%}
{{ column.name }}
{%- endif %}{% br %}
{%- endfrom %}
{%- endif %}
{%- endfor %}
FROM [TABLEOWNER].[{{ table.name }}] {{ table.name }}{% br %}
{%- for fk in rel.fks %}
{%- fetch fk.toTable %}
{%- if loop.first %}
{%- from fk.toTable.columns as column where column.artificial %}
WHERE {{ column.name }} = {{ zeroKeyId }}{% br %}
AND NOT EXISTS ( SELECT 1{% br %}
FROM [TABLEOWNER].[{{ fk.toTable.name }}] {{ fk.toTable.name }}{% br %}
{%- for fk in rel.fks %}
{% if loop.first %}WHERE {% else %} AND {% endif %}
{%- fetch fk.fromColumn %}
{{- table.name }}.{{ fk.fromColumn.name }} = {{ fk.toTable.name }}.{{ fk.fromColumn.name }}{% br %}
{%- endfor %}
)
;{% br %}{% br %}
SELECT @v_row_count = @@ROWCOUNT{% br %}{% br %}
COMMIT{% br %}{% br %}
SET @v_insert_count = @v_insert_count + @v_row_count{% br %}
{{- addDetailMessage(indent = " ",message = fk.toTable.name + " updated. ' + CONVERT(VARCHAR,@v_row_count) + ' records added.") }}
{%- endfrom %}
{%- endif %}
{%- endfor %}
{% br %}
BEGIN TRANSACTION{% br %}{% br %}
UPDATE [TABLEOWNER].[{{ table.name }}] WITH ( TABLOCKX ){% br %}
{%- for fk in rel.fks %}
{%- fetch fk.toTable %}
{%- if loop.first %}
{%- from fk.toTable.columns as column where column.artificial %}
SET {{ column.name }} = {{ fk.toTable.name }}.{{ column.name }}{% br %}
{%- from table.columns as column2 where column2.updateTime %}
,{{ column2.name }} = @v_current_datetime{% br %}
{%- endfrom %}
FROM [TABLEOWNER].[{{ fk.toTable.name }}]{% br %}
WHERE {{ table.name }}.{{ column.name }} = {{ zeroKeyId }}{% br %}
{%- for fkBK in rel.fks %}
{%- fetch fkBK.fromColumn %}
{%- fetch fkBK.toColumn %}
AND {{ fk.toTable.name }}.{{ fkBK.toColumn.name }} = {{ table.name }}.{{ fkBK.fromColumn.name }}{% br %}
{%- endfor %}
{%- endfrom %}
{%- endif %}
{%- endfor %}
;{% br %}{% br %}
SELECT @v_row_count = @@ROWCOUNT{% br %}{% br %}
COMMIT{% br %}{% br %}
SET @v_update_count = @v_update_count + @v_row_count{% br %}
{{- addDetailMessage(indent = " ",message = table.name + " updated. ' + CONVERT(VARCHAR,@v_row_count) + ' records updated.") }}
{% br %}
{%- else %}
-- {{ fk.toTable.name }}{% br %}
-- No insert of missing business keys for {{ fk.toTable.name }} allowed{% br %}{% br %}
{%- endif %}
{%- endif %}
{%- endfor %}
{%- endfrom %}
{#- CleanUp #}
{%- if hasHashKey == true -%}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Clean Up") }}
SET @v_step = {% counter %}00{% br %}{% br %}
IF OBJECT_ID('tempdb..{{ TempTable }}') IS NOT NULL{% br %}
DROP TABLE {{ TempTable }}{% br %}{% br %}
{%- endif %}
{#- Finish Up #}
{{- addProcedureCommentBlock(indent = " ", commentMessage = "Finish Up") }}
SET @v_step = {% counter %}00{% br %}{% br %}
{{- addFinishTask() }}
{{- addReturnMessage() }}
END TRY{% br %}
{{- addProcedureException() -}}