In [None]:
%%flink_gateway_sql_prepare
SET 'pipeline.name' = 'SQL properties test job';

In [None]:
%%flink_gateway_sql_prepare

--INSERT  INTO alerts_bulk
WITH A1 AS (SELECT A.id as this,
                   A.`type` as typ,
                   IFNULL(A.`deleted`, false) as edeleted,
                   B.`attributeValue` as val,
                   B.`nodeType` as nodeType,
                   B.`type` as attr_typ,
                   B.`deleted` as `adeleted`,
                   C.subject as foundVal,
                   C.object as foundClass,
                   B.`datasetId` as `index`,
                   D.propertyPath as propertyPath,
                   D.propertyClass as propertyClass,
                   D.propertyNodetype as propertyNodetype,
                   D.maxCount as maxCount,
                   D.minCount as minCount,
                   D.severity as severity,
                   D.minExclusive as minExclusive,
                   D.maxExclusive as maxExclusive,
                   D.minInclusive as minInclusive,
                   D.maxInclusive as maxInclusive,
                   D.minLength as minLength,
                   D.maxLength as maxLength,
                   D.`pattern` as `pattern`,
                   D.ins as ins
                   FROM `entity_view` AS A JOIN `propertyChecksTable` as D ON A.`type` = D.targetClass
            LEFT JOIN attributes_view AS B ON D.propertyPath = B.name and B.entityId = A.id
            LEFT JOIN rdf as C ON C.subject = '<' || B.`attributeValue` || '>' and B.`type` = 'https://uri.etsi.org/ngsi-ld/Property'
                and C.predicate = '<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>' and C.object = '<' || D.propertyClass || '>'
            )
SELECT this AS resource,
 'NodeKindConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND NOT IFNULL(adeleted, false) AND (nodeType <> `propertyNodetype`)
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND NOT IFNULL(adeleted, false) AND (nodeType <> `propertyNodetype`)
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Node is not ' ||
            CASE WHEN `propertyNodetype` = '@id' THEN ' an IRI' ELSE 'a Literal' END
        ELSE 'All ok' END as `text`
        
FROM A1 WHERE propertyNodetype IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
    'DatatypeConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL  AND (val is NULL OR foundVal is NULL)
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (val is NULL OR foundVal is NULL)
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Invalid value ' || IFNULL(val, 'NULL')  || ' not type of ' || `propertyClass` || '.'
        ELSE 'All ok' END as `text`
        
FROM A1  WHERE propertyNodetype = '@id' and propertyClass IS NOT NULL and NOT IFNULL(adeleted, false) and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
 'MinExclusiveConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) > TRY_CAST(`minExclusive` AS DOUBLE)) )
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) > TRY_CAST(`minExclusive` AS DOUBLE)) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' not comparable with ' || `minExclusive` || '.'
        WHEN typ IS NOT NULL AND attr_typ IS NOT NULL AND NOT (TRY_CAST(val as DOUBLE) > TRY_CAST( `minExclusive` as DOUBLE) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' is not > ' || `minExclusive` || '.'
        ELSE 'All ok' END as `text`
        
FROM A1 where `minExclusive` IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
 'MaxExclusiveConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) < TRY_CAST(`maxExclusive` AS DOUBLE)) )
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) < TRY_CAST(`maxExclusive` AS DOUBLE)) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' not comparable with ' || `maxExclusive` || '.'
        WHEN typ IS NOT NULL AND attr_typ IS NOT NULL AND NOT (TRY_CAST(val as DOUBLE) < TRY_CAST( `maxExclusive` as DOUBLE) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' is not < ' || `maxExclusive` || '.'
        ELSE 'All ok' END as `text`
        
FROM A1 where `maxExclusive` IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
 'MaxInclusiveConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) <= TRY_CAST(`maxInclusive` AS DOUBLE)) )
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) <= TRY_CAST(`maxInclusive` AS DOUBLE)) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' not comparable with ' || `maxInclusive` || '.'
        WHEN typ IS NOT NULL AND attr_typ IS NOT NULL AND NOT (TRY_CAST(val as DOUBLE) <= TRY_CAST( `maxInclusive` as DOUBLE) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' is not <= ' || `maxInclusive` || '.'
        ELSE 'All ok' END as `text`
        
FROM A1 where `maxInclusive` IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
 'MinInclusiveConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) >= TRY_CAST(`minInclusive` AS DOUBLE)) )
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND (TRY_CAST(val AS DOUBLE) is NULL or NOT (TRY_CAST(val as DOUBLE) >= TRY_CAST(`minInclusive` AS DOUBLE)) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' not comparable with ' || `minInclusive` || '.'
        WHEN typ IS NOT NULL AND attr_typ IS NOT NULL AND NOT (TRY_CAST(val as DOUBLE) >= TRY_CAST( `minInclusive` as DOUBLE) )
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' is not >= ' || `minInclusive` || '.'
        ELSE 'All ok' END as `text`
        
FROM A1 where `minInclusive` IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
 'InConstraintComponent('|| `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND NOT ',' || `ins` || ',' LIKE '%,"' || replace(val, '"', '\"') || '",%'
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND NOT ',' || `ins` || ',' LIKE '%,"' || replace(val, '"', '\"') || '",%'
        THEN 'Model validation for Property propertyPath failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' is not allowed.'
        ELSE 'All ok' END as `text`
        
FROM A1 where `ins` IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
 'PatternConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND NOT REGEXP(val, `pattern`)
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND NOT REGEXP(val, `pattern`)
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Value ' || IFNULL(val, 'NULL') || ' does not match pattern ' || `pattern`
        ELSE 'All ok' END as `text`
        
FROM A1 WHERE `pattern` IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
    'CountConstraintComponent(' || `propertyPath` || ')' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted AND (count(CASE WHEN NOT IFNULL(adeleted, false) THEN attr_typ ELSE NULL END) > TRY_CAST(`maxCount` AS INTEGER) OR  count(CASE WHEN NOT IFNULL(adeleted, false) THEN attr_typ ELSE NULL END) < TRY_CAST(`minCount` AS INTEGER))
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND (count(CASE WHEN NOT IFNULL(adeleted, false) THEN attr_typ ELSE NULL END) > TRY_CAST(`maxCount` AS INTEGER) OR count(CASE WHEN NOT IFNULL(adeleted, false) THEN attr_typ ELSE NULL END) < TRY_CAST(`minCount` AS STRING))
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '.  Found ' || TRY_CAST(count(CASE WHEN NOT IFNULL(adeleted, false) THEN attr_typ ELSE NULL END) AS STRING) || ' relationships instead of
                            [' || IFNULL('[' || `minCount`, '[0') || IFNULL(`maxCount` || ']', '[') || '!'
        ELSE 'All ok' END as `text`
        
FROM A1  WHERE `minCount` is NOT NULL or `maxCount` is NOT NULL
group by this, typ, propertyPath, minCount, maxCount, severity, edeleted
UNION ALL
SELECT this AS resource,
 'MinLengthConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted  AND attr_typ IS NOT NULL AND CHAR_LENGTH(val) < TRY_CAST(`minLength` AS INTEGER)
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND CHAR_LENGTH(val) < TRY_CAST(`minLength` as INTEGER)
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Length of ' || IFNULL(val, 'NULL') || ' is < ' || `minLength` || '.'
        ELSE 'All ok' END as `text`
        
FROM A1 WHERE `minLength` IS NOT NULL and `index` IS NOT NULL
UNION ALL
SELECT this AS resource,
 'MaxLengthConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE WHEN NOT edeleted  AND attr_typ IS NOT NULL AND CHAR_LENGTH(val) > TRY_CAST(`maxLength` AS INTEGER)
        THEN `severity`
        ELSE 'ok' END AS severity,
    'customer'  customer,
    CASE WHEN NOT edeleted AND attr_typ IS NOT NULL AND CHAR_LENGTH(val) > TRY_CAST(`maxLength` as INTEGER)
        THEN 'Model validation for Property ' || `propertyPath` || ' failed for ' || this || '. Length of ' || IFNULL(val, 'NULL') || ' is > ' || `maxLength` || '.'
        ELSE 'All ok' END as `text`
        
FROM A1 WHERE `maxLength` IS NOT NULL and `index` IS NOT NULL;


In [None]:
%%flink_gateway_sql_prepare
SET 'pipeline.name' = 'SQL relationships test job';

In [None]:
%%flink_gateway_sql_query
    --INSERT  INTO alerts_bulk
            WITH A1 as (
                    SELECT A.id AS this,
                        A.`type` as typ,
                        IFNULL(A.`deleted`, false) as edeleted,
                        C.`type` AS entity,
                        B.`type` AS link,
                        B.`nodeType` as nodeType,
                        B.`deleted` as `adeleted`,
                        B.`datasetId` as `index`,
                        D.targetClass as targetClass,
                        D.propertyPath as propertyPath,
                        D.propertyClass as propertyClass,
                        D.maxCount as maxCount,
                        D.minCount as minCount,
                        D.severity as severity
                    FROM entity_view AS A JOIN `relationshipChecksTable` as D ON A.`type` = D.targetClass
                    LEFT JOIN attributes_view AS B ON B.name = D.propertyPath and B.entityId = A.id
                    LEFT JOIN entity_view AS C ON B.`attributeValue` = C.id and B.`type` = 'https://uri.etsi.org/ngsi-ld/Relationship'
            )
            SELECT * /*this AS resource,
                'ClassConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
                'Development' AS environment,
                
                ARRAY ['SHACL Validator'] AS service,
                
                CASE WHEN NOT edeleted AND NOT IFNULL(adeleted, false) AND link IS NOT NULL AND entity IS NULL THEN `severity`
                    ELSE 'ok' END AS severity,
                'customer'  customer,

                CASE WHEN NOT edeleted AND NOT IFNULL(adeleted, false) AND link IS NOT NULL AND entity IS NULL
                        THEN 'Model validation for relationship' || `propertyPath` || 'failed for '|| this || '. Relationship not linked to existing entity of type ' ||  `propertyClass` || '.'
                    ELSE 'All ok' END as `text`*/
            FROM A1 --WHERE A1.propertyClass IS NOT NULL and `index` IS NOT NULL
/*UNION ALL
            SELECT this AS resource,
                'CountConstraintComponent(' || `propertyPath` || ')' AS event,
                'Development' AS environment,
                
                ARRAY ['SHACL Validator'] AS service,
                
                CASE WHEN NOT edeleted AND (count(CASE WHEN NOT IFNULL(adeleted, false) THEN link ELSE NULL END) > TRY_CAST(`maxCount` AS INTEGER)
                                            OR count(CASE WHEN NOT IFNULL(adeleted, false) THEN link ELSE NULL END) < TRY_CAST(`minCount` AS INTEGER))
                    THEN `severity`
                    ELSE 'ok' END AS severity,
                'customer'  customer,
                CASE WHEN NOT edeleted AND (count(CASE WHEN NOT IFNULL(adeleted, false) THEN link ELSE NULL END) > TRY_CAST(`maxCount` AS INTEGER)
                                            OR count(CASE WHEN NOT IFNULL(adeleted, false) THEN link ELSE NULL END) < TRY_CAST(`minCount` AS INTEGER))
                    THEN
                        'Model validation for relationship ' || `propertyPath` || 'failed for ' || this || ' . Found ' ||
                            TRY_CAST(count(CASE WHEN NOT IFNULL(adeleted, false) THEN link ELSE NULL END) AS STRING) || ' relationships instead of
                            [' || `minCount` || ', ' || `maxCount` || ']!'
                    ELSE 'All ok' END as `text`
            FROM A1 WHERE `minCount` is NOT NULL or `maxCount` is NOT NULL
            group by this, edeleted, propertyPath, maxCount, minCount, severity
UNION ALL
            SELECT this AS resource,
                'NodeKindConstraintComponent(' || `propertyPath` || '[' || CASE WHEN `index` = '@none' THEN '0' ELSE `index` END || '])' AS event,
                'Development' AS environment,
                
                ARRAY ['SHACL Validator'] AS service,
                
                CASE WHEN NOT edeleted AND NOT IFNULL(`adeleted`, false) AND (nodeType <> '@id')
                    THEN `severity`
                    ELSE 'ok' END AS severity,
                'customer'  customer,
                CASE WHEN NOT edeleted AND NOT IFNULL(`adeleted`, false) AND (nodeType <> '@id')
                    THEN
                        'Model validation for relationship ' || `propertyPath` || ' failed for ' || this || ' . NodeType is '|| nodeType || ' but must be an IRI.'
                    ELSE 'All ok' END as `text`
            FROM A1 WHERE `index` IS NOT NULL*/;

In [None]:
%%flink_gateway_sql_prepare
SET 'pipeline.name' = 'SPARQL SQL job';

In [None]:
%%flink_gateway_sql_query
--INSERT INTO
 --   alerts_bulk
SELECT
    this_left AS resource,
    'SPARQLConstraintComponent(StateValueShape)' AS event,
    'Development' AS environment,
    ARRAY ['SHACL Validator'] AS service,
    CASE
        WHEN this IS NOT NULL THEN 'warning'
        ELSE 'ok'
    END AS severity,
    'customer' customer,
    CASE
        WHEN this IS NOT NULL THEN 'State value ' || IFNULL(`value`, 'NULL') || ' are not a valid machineState for machine ' || IFNULL(`this`, 'NULL') || ' of type ' || IFNULL(`type`, 'NULL') || ''
        ELSE 'All ok'
    END as `text`
FROM
    (
        SELECT
            A.this as this_left,
            B.this as this,
            *
        FROM
            (
                SELECT
                    id as this,
                    `type`
                from
                    entity_view
                WHERE
                    (
                        `type` = 'https://industryfusion.github.io/contexts/example/v0/base_entities/Machine'
                        OR `type` = 'https://industryfusion.github.io/contexts/example/v0/base_entities/Filter'
                        OR `type` = 'https://industryfusion.github.io/contexts/example/v0/base_entities/Cutter'
                        OR `type` = 'https://industryfusion.github.io/contexts/example/v0/base_entities/Plasmacutter'
                        OR `type` = 'https://industryfusion.github.io/contexts/example/v0/base_entities/Lasercutter'
                    )
            ) as A
            LEFT JOIN (
                SELECT
                    THISTABLE.`id` AS `this`,
                    `THISHAS_STATETABLE`.`attributeValue` AS `value`,
                    THISTABLE.type AS `type`,
                    THISTABLE.`deleted` as `deleted`
                FROM
                    rdf AS THISRPT0EX8DWPFAL9XRSUBCLASSOFTABLE9AFWTY0GO5
                    JOIN entity_view AS THISTABLE ON '<' || THISTABLE.`type` || '>' = `THISRPT0EX8DWPFAL9XRSUBCLASSOFTABLE9AFWTY0GO5`.`subject`
                    JOIN attributes_view AS THISHAS_STATETABLE ON THISHAS_STATETABLE.name = 'https://industryfusion.github.io/contexts/example/v0/base_entities/hasState'
                    and THISTABLE.id = THISHAS_STATETABLE.entityId
                    and THISHAS_STATETABLE.type = 'https://uri.etsi.org/ngsi-ld/Property'
                WHERE
                    NOT EXISTS (
                        SELECT
                            THISTABLE.type AS `type`,
                            TYPESUBCLASSOFTABLE372NCDN00F.object AS `subtype`,
                            `THISHAS_STATETABLE`.`attributeValue` AS `value`
                        FROM
                            rdf AS TYPESUBCLASSOFTABLE372NCDN00F
                            JOIN rdf AS VALUEISVALIDFORTABLETSTPYCRB4A ON VALUEISVALIDFORTABLETSTPYCRB4A.predicate = '<https://industryfusion.github.io/contexts/example/v0/base_knowledge/isValidFor>'
                        WHERE
                            TYPESUBCLASSOFTABLE372NCDN00F.subject = '<' || THISTABLE.type || '>'
                            and TYPESUBCLASSOFTABLE372NCDN00F.predicate = '<http://www.w3.org/2000/01/rdf-schema#subClassOf>'
                            and VALUEISVALIDFORTABLETSTPYCRB4A.subject = '<' || `THISHAS_STATETABLE`.`attributeValue` || '>'
                            and VALUEISVALIDFORTABLETSTPYCRB4A.object = TYPESUBCLASSOFTABLE372NCDN00F.object
                    )
                    and THISRPT0EX8DWPFAL9XRSUBCLASSOFTABLE9AFWTY0GO5.predicate = '<http://www.w3.org/2000/01/rdf-schema#subClassOf>'
                    and THISRPT0EX8DWPFAL9XRSUBCLASSOFTABLE9AFWTY0GO5.object = '<https://industryfusion.github.io/contexts/example/v0/base_entities/Machine>'
            ) as B ON A.this = B.this and IFNULL(B.`deleted`, FALSE) IS FALSE
    );

In [None]:
%flink_sql_replace -s '{{{{.Values.flink.alertWindow | squote}}}}' -r "'0.001'"

In [None]:
%%flink_gateway_sql_query

-- insert into alerts select  resource, event, environment, service, severity, customer, `text` from
         --   (
                select resource, event,
                    environment,
                    service,
                    customer,
                    last_value(`text`) AS `text`,
                    last_value(severity) as severity
                from (
                select `resource`, `event`,
                        `environment`,
                        service,
                        last_value(severity) as `severity`,
                        `customer`,
                        last_value(`text`) as `text`
                            from alerts_bulk
                            group by `resource`, `event`, service, customer, environment, TUMBLE(ts, INTERVAL {{.Values.flink.alertWindow | squote}} SECOND)
                ) as A
                   GROUP BY A.resource, A.event, A.service, A.customer, A.environment
           -- );
;

In [None]:
%%flink_gateway_sql_query

--INSERT INTO  alerts_bulk
WITH A1 as (        
        SELECT
            A.id AS this,
             A.`type` as typ,
             C.`type` AS entity,
             B.`type` AS link,
             B.`nodeType` as nodeType,
             CAST(B.`index` as INTEGER) as `index`
        FROM
            cutter_view AS A 
            LEFT JOIN attributes_view AS B ON B.id = A.`https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter` 
            LEFT JOIN filter_view AS C ON B.`https://uri.etsi.org/ngsi-ld/hasObject` = C.id 
        WHERE
             (
                B.entityId = A.id
                OR B.entityId IS NULL
            ) 
            AND (
                B.name = 'https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter'
                OR B.name IS NULL
            )  
    ) 
SELECT
    this AS resource,
     'ClassConstraintComponent(https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter[' || TRY_CAST(`index` AS STRING) || '])' AS event,
     'Development' AS environment,
      ARRAY ['SHACL Validator'] AS service,
      CASE
        WHEN typ IS NOT NULL
        AND link IS NOT NULL
        AND entity IS NULL THEN 'warning' 
        ELSE 'ok'
    END AS severity,
     'customer' customer,
      CASE
        WHEN typ IS NOT NULL
        AND link IS NOT NULL
        AND entity IS NULL  THEN 'Model validation for relationship https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter failed for ' || this || '. Relationship not linked to existing entity of type https://industryfusion.github.io/contexts/example/v0/base_entities/Filter.' 
        ELSE 'All ok'
    END as `text` 
FROM
    A1;

In [None]:
%load_ext jupyflink.magics

In [None]:
%flink_gateway_init --debug

In [None]:
%%flink_gateway_sql_prepare
ADD JAR '/opt/gateway/jars/flink-sql-connector-kafka-1.16.2.jar';

In [None]:
%%flink_gateway_sql_prepare

SET 'pipeline.name' =  'iff/shacl-validation';
SET 'table.exec.sink.upsert-materialize' =  'none';
SET 'execution.savepoint.ignore-unclaimed-state' = 'true';
SET 'pipeline.object-reuse' = 'true';
SET 'parallelism.default' = '1';
SET 'state.backend.rocksdb.writebuffer.size' = '64 kb';
SET 'state.backend.rocksdb.use-bloom-filter' = 'true';
SET 'state.backend' = 'rocksdb';
SET 'state.backend.rocksdb.predefined-options' = 'SPINNING_DISK_OPTIMIZED_HIGH_MEM';
SET 'table.exec.state.ttl' = '86400000 ms';

CREATE TABLE `attributes` (`id` STRING,`entityId` STRING,`name` STRING,`nodeType` STRING,`valueType` STRING,`index` INTEGER,`type` STRING,`https://uri.etsi.org/ngsi-ld/datasetId` STRING,`https://uri.etsi.org/ngsi-ld/hasValue` STRING,`https://uri.etsi.org/ngsi-ld/hasObject` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,watermark FOR ts AS ts) WITH ('connector' = 'kafka','format' = 'json','json.fail-on-missing-field' = 'False','json.ignore-parse-errors' = 'True','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'scan.startup.mode' = 'latest-offset', 'topic' = 'iff.ngsild.attributes');
CREATE TABLE `attributes_insert` (`id` STRING,`entityId` STRING,`name` STRING,`nodeType` STRING,`valueType` STRING,`index` INTEGER,`type` STRING,`https://uri.etsi.org/ngsi-ld/datasetId` STRING,`https://uri.etsi.org/ngsi-ld/hasValue` STRING,`https://uri.etsi.org/ngsi-ld/hasObject` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,watermark FOR `ts` AS `ts`, PRIMARY KEY (id,index) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','value.format' = 'json','value.json.fail-on-missing-field' = 'False','value.json.ignore-parse-errors' = 'True', 'key.format' = 'json','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'topic' = 'iff.ngsild.attributes_insert');
CREATE TABLE `alerts_bulk` (`resource` STRING,`event` STRING,`environment` STRING,`service` ARRAY<STRING>,`severity` STRING,`customer` STRING,`text` STRING,watermark FOR ts AS ts - INTERVAL '0.0' SECONDS,`ts` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, PRIMARY KEY (resource,event) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','value.format' = 'json','value.json.fail-on-missing-field' = 'False','value.json.ignore-parse-errors' = 'True', 'key.format' = 'json','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'topic' = 'iff.alerts.bulk');
CREATE TABLE `workpiece` (`id` STRING,`type` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',watermark FOR `ts` AS `ts`,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasHeight` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasLength` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasMaterial` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasWidth` STRING) WITH ('connector' = 'kafka','format' = 'json','json.fail-on-missing-field' = 'False','json.ignore-parse-errors' = 'True','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'scan.startup.mode' = 'latest-offset', 'topic' = 'iff.ngsild.entities.workpiece');
CREATE TABLE `cutter` (`id` STRING,`type` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',watermark FOR `ts` AS `ts`,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasInWorkpiece` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasOutWorkpiece` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasState` STRING) WITH ('connector' = 'kafka','format' = 'json','json.fail-on-missing-field' = 'False','json.ignore-parse-errors' = 'True','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'scan.startup.mode' = 'latest-offset', 'topic' = 'iff.ngsild.entities.cutter');
CREATE TABLE `machine` (`id` STRING,`type` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',watermark FOR `ts` AS `ts`,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasState` STRING) WITH ('connector' = 'kafka','format' = 'json','json.fail-on-missing-field' = 'False','json.ignore-parse-errors' = 'True','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'scan.startup.mode' = 'latest-offset', 'topic' = 'iff.ngsild.entities.machine');
CREATE TABLE `rdf` (`subject` STRING,`predicate` STRING,`object` STRING,`index` INTEGER, PRIMARY KEY (subject,predicate,index) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','value.format' = 'json','value.json.fail-on-missing-field' = 'False','value.json.ignore-parse-errors' = 'True', 'key.format' = 'json','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'topic' = 'iff.rdf');
CREATE TABLE `filter` (`id` STRING,`type` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',watermark FOR `ts` AS `ts`,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasCartridge` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasState` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/hasStrength` STRING) WITH ('connector' = 'kafka','format' = 'json','json.fail-on-missing-field' = 'False','json.ignore-parse-errors' = 'True','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'scan.startup.mode' = 'latest-offset', 'topic' = 'iff.ngsild.entities.filter');
CREATE TABLE `filter_cartridge` (`id` STRING,`type` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',watermark FOR `ts` AS `ts`,`https://industryfusion.github.io/contexts/example/v0/base_entities/isUsedFrom` STRING,`https://industryfusion.github.io/contexts/example/v0/base_entities/isUsedUntil` STRING,`https://industryfusion.github.io/contexts/example/v0/filter_entities/hasWasteclass` STRING) WITH ('connector' = 'kafka','format' = 'json','json.fail-on-missing-field' = 'False','json.ignore-parse-errors' = 'True','properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 'scan.startup.mode' = 'latest-offset', 'topic' = 'iff.ngsild.entities.filter_cartridge');
CREATE VIEW `attributes_view` AS SELECT id, entityId, name, nodeType, valueType, index, `type`,
       `https://uri.etsi.org/ngsi-ld/datasetId`,
       `https://uri.etsi.org/ngsi-ld/hasValue`,
       `https://uri.etsi.org/ngsi-ld/hasObject`,
       `ts`
FROM (
  SELECT *,
  ROW_NUMBER() OVER (PARTITION BY `id`, `index`
     ORDER BY ts DESC) AS rownum
  FROM `attributes`)
  WHERE rownum = 1 and entityId is NOT NULL;
CREATE VIEW `machine_view` AS SELECT `id`, `type`,
 `ts`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasState` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
ORDER BY ts DESC) AS rownum
FROM `machine` )
WHERE rownum = 1;
CREATE VIEW `filter_cartridge_view` AS SELECT `id`, `type`,
 `ts`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/isUsedFrom`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/isUsedUntil`,
 `https://industryfusion.github.io/contexts/example/v0/filter_entities/hasWasteclass` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
ORDER BY ts DESC) AS rownum
FROM `filter_cartridge` )
WHERE rownum = 1;
CREATE VIEW `cutter_view` AS SELECT `id`, `type`,
 `ts`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasInWorkpiece`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasOutWorkpiece`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasState` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
ORDER BY ts DESC) AS rownum
FROM `cutter` )
WHERE rownum = 1;
CREATE VIEW `workpiece_view` AS SELECT `id`, `type`,
 `ts`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasHeight`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasLength`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasMaterial`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasWidth` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
ORDER BY ts DESC) AS rownum
FROM `workpiece` )
WHERE rownum = 1;
CREATE VIEW `filter_view` AS SELECT `id`, `type`,
 `ts`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasCartridge`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasState`,
 `https://industryfusion.github.io/contexts/example/v0/base_entities/hasStrength` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
ORDER BY ts DESC) AS rownum
FROM `filter` )
WHERE rownum = 1;

In [None]:
%%flink_gateway_sql_query

INSERT  INTO alerts_bulk
            WITH A1 as (
                    SELECT A.id AS this,
                           A.`type` as typ,
                           C.`type` AS entity,
                           B.`type` AS link,
                           B.`nodeType` as nodeType,
                    B.`index` as `index` FROM cutter_view AS A
                    LEFT JOIN attributes_view AS B ON B.id = A.`https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter`
                    LEFT JOIN filter_view AS C ON B.`https://uri.etsi.org/ngsi-ld/hasObject` = C.id
                    WHERE
                        index IS NOT NULL --AND
                        --(B.entityId = A.id OR B.entityId IS NULL)
                        --AND (B.name = 'https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter' OR B.name IS NULL)

            )
            SELECT this AS resource,
                'ClassConstraintComponent(https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter[' || TRY_CAST( `index` AS STRING) || '])' AS event,
                'Development' AS environment,
                
                ARRAY ['SHACL Validator'] AS service,
                
                CASE WHEN typ IS NOT NULL AND link IS NOT NULL AND entity IS NULL THEN 'warning'
                    ELSE 'ok' END AS severity,
                'customer'  customer,

                CASE WHEN typ IS NOT NULL AND link IS NOT NULL AND entity IS NULL
                        THEN 'Model validation for relationship https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter failed for '|| this || '. Relationship not linked to existing entity of type https://industryfusion.github.io/contexts/example/v0/base_entities/Filter.'
                    ELSE 'All ok' END as `text`
            FROM A1;
/*UNION ALL
            SELECT this AS resource,
                'CountConstraintComponent(https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter)' AS event,
                'Development' AS environment,
                
                ARRAY ['SHACL Validator'] AS service,
                
                CASE WHEN typ IS NOT NULL AND ( count(link) > 1 OR count(link) < 1)
                    THEN 'warning'
                    ELSE 'ok' END AS severity,
                'customer'  customer,
                CASE WHEN typ IS NOT NULL AND ( count(link) > 1 OR count(link) < 1)
                    THEN
                        'Model validation for relationship https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter failed for ' || this || ' . Found ' || TRY_CAST(count(link) AS STRING) || ' relationships instead of
                            [1,1]!'
                    ELSE 'All ok' END as `text`
            FROM A1
            group by this, typ
UNION ALL
            SELECT this AS resource,
                'NodeKindConstraintComponent(https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter)' AS event,
                'Development' AS environment,
                
                ARRAY ['SHACL Validator'] AS service,
                
                CASE WHEN typ IS NOT NULL AND link IS NOT NULL AND (nodeType is NULL OR nodeType <> '@id')
                    THEN 'warning'
                    ELSE 'ok' END AS severity,
                'customer'  customer,
                CASE WHEN typ IS NOT NULL AND  link IS NOT NULL AND (nodeType is NULL OR nodeType <> '@id')
                    THEN
                        'Model validation for relationship https://industryfusion.github.io/contexts/example/v0/base_entities/hasFilter failed for ' || this || ' . NodeType is '|| nodeType || ' but must be an IRI.'
                    ELSE 'All ok' END as `text`
            FROM A1;*/



In [None]:
%flink_gateway_init --debug

In [None]:
%%flink_gateway_sql_prepare
ADD JAR '/opt/gateway/jars/flink-sql-connector-kafka-1.16.2.jar';

In [None]:
%%flink_gateway_sql_prepare
DROP TABLE IF EXISTS `alerts`;
CREATE TABLE `alerts` (
    `resource` STRING,
    `event` STRING,
    `environment` STRING,
    `service` ARRAY < STRING >,
    `severity` STRING,
    `customer` STRING,
    `text` STRING,
    PRIMARY KEY (resource, event) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.alerts'
);
DROP TABLE IF EXISTS `alerts`;
CREATE TABLE `alerts` (
    `resource` STRING,
    `event` STRING,
    `environment` STRING,
    `service` ARRAY < STRING >,
    `severity` STRING,
    `customer` STRING,
    `text` STRING
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.alerts'
);
DROP TABLE IF EXISTS `alerts_bulk`;
CREATE TABLE `alerts_bulk` (
    `resource` STRING,
    `event` STRING,
    `environment` STRING,
    `service` ARRAY < STRING >,
    `severity` STRING,
    `customer` STRING,
    `text` STRING,
    watermark FOR ts AS ts - INTERVAL '0.001' SECONDS,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp' VIRTUAL,
        PRIMARY KEY (resource, event) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.alerts.bulk'
);
DROP TABLE IF EXISTS `attributes`;
 CREATE TABLE `attributes` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp' VIRTUAL,
        watermark FOR ts AS ts
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.ngsild.attributes'
);
DROP TABLE IF EXISTS `attributes_writeback`;
 CREATE TABLE `attributes_writeback` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    PRIMARY KEY (id, index) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.ngsild.attributes'
);
DROP TABLE IF EXISTS `attributes_insert`;
 CREATE TABLE `attributes_insert` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp' VIRTUAL,
        watermark FOR `ts` AS `ts`,
        PRIMARY KEY (id, index) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.ngsild.attributes_insert'
);
DROP TABLE IF EXISTS `attributes_insert_filter`;
 CREATE TABLE `attributes_insert_filter` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp',
        watermark FOR `ts` AS `ts`
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.ngsild.attributes_insert'
);
DROP TABLE IF EXISTS `ngsild_updates`;
 CREATE TABLE `ngsild_updates` (
    `op` STRING,
    `overwriteOrReplace` Boolean,
    `noForward` Boolean,
    `entities` STRING
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.ngsild-updates'
);
DROP VIEW IF EXISTS `attributes_insert_view`;
CREATE VIEW `attributes_insert_view` AS
SELECT
    id,
    entityId,
    name,
    nodeType,
    valueType,
    index,
    `type`,
     'https://uri.etsi.org/ngsi-ld/datasetId',
     `https://uri.etsi.org/ngsi-ld/hasValue`,
     `https://uri.etsi.org/ngsi-ld/hasObject`,
     `ts` FROM (
        SELECT
            *,
             ROW_NUMBER() OVER (
                PARTITION BY `id`,
                `index` 
                ORDER BY
                    ts DESC
            ) AS rownum 
        FROM
            `attributes_insert_filter`
    ) 
WHERE
    rownum = 1
    and entityId is NOT NULL;

In [None]:
%%flink_gateway_sql_prepare
DROP TABLE IF EXISTS `alerts`;
CREATE TABLE `alerts` (
    `resource` STRING,
    `event` STRING,
    `environment` STRING,
    `service` ARRAY < STRING >,
    `severity` STRING,
    `customer` STRING,
    `text` STRING,
    PRIMARY KEY (resource, event) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.alerts'
);
DROP TABLE IF EXISTS `alerts`;
CREATE TABLE `alerts` (
    `resource` STRING,
    `event` STRING,
    `environment` STRING,
    `service` ARRAY < STRING >,
    `severity` STRING,
    `customer` STRING,
    `text` STRING
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.alerts'
);
DROP TABLE IF EXISTS `alerts_bulk`;
CREATE TABLE `alerts_bulk` (
    `resource` STRING,
    `event` STRING,
    `environment` STRING,
    `service` ARRAY < STRING >,
    `severity` STRING,
    `customer` STRING,
    `text` STRING,
    watermark FOR ts AS ts - INTERVAL '0.001' SECONDS,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp' VIRTUAL,
        PRIMARY KEY (resource, event) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.alerts.bulk'
);
DROP TABLE IF EXISTS `attributes`;
 CREATE TABLE `attributes` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp' VIRTUAL,
        watermark FOR ts AS ts
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.ngsild.attributes'
);
DROP TABLE IF EXISTS `attributes_writeback`;
 CREATE TABLE `attributes_writeback` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    PRIMARY KEY (id, index) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.ngsild.attributes'
);
DROP TABLE IF EXISTS `attributes_insert`;
 CREATE TABLE `attributes_insert` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp' VIRTUAL,
        watermark FOR `ts` AS `ts`,
        PRIMARY KEY (id, index) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'iff.ngsild.attributes_insert'
);
DROP TABLE IF EXISTS `attributes_insert_filter`;
 CREATE TABLE `attributes_insert_filter` (
    `id` STRING,
    `entityId` STRING,
    `name` STRING,
    `nodeType` STRING,
    `valueType` STRING,
    `index` INTEGER,
    `type` STRING,
    `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
    `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
    `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    `ts` TIMESTAMP(3) METADATA
    FROM
        'timestamp',
        watermark FOR `ts` AS `ts`
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.ngsild.attributes_insert'
);
DROP TABLE IF EXISTS `ngsild_updates`;
 CREATE TABLE `ngsild_updates` (
    `op` STRING,
    `overwriteOrReplace` Boolean,
    `noForward` Boolean,
    `entities` STRING
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'iff.ngsild-updates'
);
DROP VIEW IF EXISTS `attributes_insert_view`;
CREATE VIEW `attributes_insert_view` AS
SELECT
    id,
    entityId,
    name,
    nodeType,
    valueType,
    index,
    `type`,
     'https://uri.etsi.org/ngsi-ld/datasetId',
     `https://uri.etsi.org/ngsi-ld/hasValue`,
     `https://uri.etsi.org/ngsi-ld/hasObject`,
     `ts` FROM (
        SELECT
            *,
             ROW_NUMBER() OVER (
                PARTITION BY `id`,
                `index` 
                ORDER BY
                    ts DESC
            ) AS rownum 
        FROM
            `attributes_insert_filter`
    ) 
WHERE
    rownum = 1
    and entityId is NOT NULL;

In [None]:
%flink_sql_replace -s '{{{{.Values.flink.alertWindow | squote}}}}' -r "'0.001'"

In [None]:
%%flink_gateway_sql_query

--          insert into alerts select  resource, event, environment, service, severity, customer, `text` from
            /*(
                select resource, event,
                    environment,
                    service,
                    customer,
                    last_value(`text`) AS `text`,
                    last_value(severity) as severity
                from (*/
                select `resource`, `event`,
                        `environment`,
                        service,
                        last_value(severity) as `severity`,
                        `customer`,
                        last_value(`text`) as `text`
                            from alerts_bulk
                            group by `resource`, `event`, service, customer, environment, TUMBLE(ts, INTERVAL {{.Values.flink.alertWindow | squote}} SECOND)
               -- ) as A
                 --  GROUP BY A.resource, A.event, A.service, A.customer, A.environment
           -- );
;

In [None]:
%%flink_gateway_sql_query
    
WITH WindowedAlerts AS (
    SELECT 
        `resource`, 
        `event`,
        `environment`,
        `service`,
        `customer`,
        LAST_VALUE(severity) AS `severity`,
        LAST_VALUE(`text`) AS `text`,
        TUMBLE_START(`ts`, INTERVAL '0.001' SECOND) AS window_start,
        TUMBLE_END(`ts`, INTERVAL '0.001' SECOND) AS window_end,
        TUMBLE_ROWTIME(`ts`, INTERVAL '0.001' SECOND) AS window_time
    FROM TABLE(
        TUMBLE(TABLE alerts_bulk, DESCRIPTOR(ts), INTERVAL '0.001' SECOND)
    )
    GROUP BY 
        `resource`, 
        `event`, 
        `service`, 
        `customer`, 
        `environment`, 
        TUMBLE(`ts`, INTERVAL '0.001' SECOND)
),
AlertsWithPreviousSeverity AS (
    SELECT
        `resource`,
        `event`,
        `environment`,
        `service`,
        `customer`,
        `severity`,
        `text`,
        `window_end`,
        `window_time`,
        LAG(`severity`) OVER (
            PARTITION BY `resource`, `event`, `service`, `customer`, `environment`
            ORDER BY `window_time`
        ) AS prev_severity
    FROM WindowedAlerts
)
SELECT 
    `resource`,
    `event`,
    `environment`,
    `service`,
    `severity`,
    `customer`,
    `text`
FROM 
    AlertsWithPreviousSeverity
WHERE 
    prev_severity IS NULL OR severity <> prev_severity;


