From 70e1b709123db38e693968504c9e83466babe5e5 Mon Sep 17 00:00:00 2001 From: Andrea Tarocchi Date: Fri, 16 Dec 2022 10:00:23 +0100 Subject: [PATCH] fixed Azure Storage Blob changefeed source connector the connector produce one record per event the connector produced data are in json format the connector add additional info as record headers --- ...torage-blob-changefeed-source.kamelet.yaml | 68 +++++++++++++++++-- ...torage-blob-changefeed-source.kamelet.yaml | 68 +++++++++++++++++-- 2 files changed, 122 insertions(+), 14 deletions(-) diff --git a/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml b/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml index a4afb7c4f..aa511d849 100644 --- a/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml +++ b/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml @@ -66,18 +66,72 @@ spec: - "camel:azure-storage-blob" - "camel:kamelet" - "camel:core" + - 'camel:jackson' - "camel:jsonpath" - "camel:timer" + - "mvn:com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.4" + - "mvn:com.azure:azure-storage-blob-changefeed:12.0.0-beta.17" template: from: uri: "timer:azure-storage-blob-stream" parameters: period: "{{period}}" steps: - - to: - uri: "azure-storage-blob:{{accountName}}" - parameters: - operation: "getChangeFeed" - accessKey: "{{accessKey}}" - credentialType: "{{credentialType}}" - - to: "kamelet:sink" + - to: + uri: "azure-storage-blob:{{accountName}}" + parameters: + operation: "getChangeFeed" + accessKey: "{{accessKey}}" + credentialType: "{{credentialType}}" + - split: + expression: + simple: "${body}" + - marshal: + json: + library: Jackson + module-class-names: com.fasterxml.jackson.datatype.jsr310.JavaTimeModule + unmarshalType: com.fasterxml.jackson.databind.JsonNode + - set-header: + name: "azure-storage-blob-changefeed-topic" + jsonpath: + suppress-exceptions: true + expression: $.topic + - set-header: + name: "azure-storage-blob-changefeed-subject" + jsonpath: + suppress-exceptions: true + expression: $.subject + - set-header: + name: "azure-storage-blob-changefeed-eventType" + jsonpath: + suppress-exceptions: true + expression: $.eventType + - set-header: + name: "azure-storage-blob-changefeed-eventTime" + jsonpath: + suppress-exceptions: true + expression: $.eventTime + - set-header: + name: "azure-storage-blob-changefeed-id" + jsonpath: + suppress-exceptions: true + expression: $.id + - set-header: + name: "azure-storage-blob-changefeed-dataVersion" + jsonpath: + suppress-exceptions: true + expression: $.dataVersion + - set-header: + name: "azure-storage-blob-changefeed-metadataVersion" + jsonpath: + suppress-exceptions: true + expression: $.metadataVersion + - set-body: + jsonpath: + expression: $.data + - marshal: + json: + library: Jackson + module-class-names: com.fasterxml.jackson.datatype.jsr310.JavaTimeModule + - to: "kamelet:sink" + diff --git a/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml index a4afb7c4f..aa511d849 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml @@ -66,18 +66,72 @@ spec: - "camel:azure-storage-blob" - "camel:kamelet" - "camel:core" + - 'camel:jackson' - "camel:jsonpath" - "camel:timer" + - "mvn:com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.4" + - "mvn:com.azure:azure-storage-blob-changefeed:12.0.0-beta.17" template: from: uri: "timer:azure-storage-blob-stream" parameters: period: "{{period}}" steps: - - to: - uri: "azure-storage-blob:{{accountName}}" - parameters: - operation: "getChangeFeed" - accessKey: "{{accessKey}}" - credentialType: "{{credentialType}}" - - to: "kamelet:sink" + - to: + uri: "azure-storage-blob:{{accountName}}" + parameters: + operation: "getChangeFeed" + accessKey: "{{accessKey}}" + credentialType: "{{credentialType}}" + - split: + expression: + simple: "${body}" + - marshal: + json: + library: Jackson + module-class-names: com.fasterxml.jackson.datatype.jsr310.JavaTimeModule + unmarshalType: com.fasterxml.jackson.databind.JsonNode + - set-header: + name: "azure-storage-blob-changefeed-topic" + jsonpath: + suppress-exceptions: true + expression: $.topic + - set-header: + name: "azure-storage-blob-changefeed-subject" + jsonpath: + suppress-exceptions: true + expression: $.subject + - set-header: + name: "azure-storage-blob-changefeed-eventType" + jsonpath: + suppress-exceptions: true + expression: $.eventType + - set-header: + name: "azure-storage-blob-changefeed-eventTime" + jsonpath: + suppress-exceptions: true + expression: $.eventTime + - set-header: + name: "azure-storage-blob-changefeed-id" + jsonpath: + suppress-exceptions: true + expression: $.id + - set-header: + name: "azure-storage-blob-changefeed-dataVersion" + jsonpath: + suppress-exceptions: true + expression: $.dataVersion + - set-header: + name: "azure-storage-blob-changefeed-metadataVersion" + jsonpath: + suppress-exceptions: true + expression: $.metadataVersion + - set-body: + jsonpath: + expression: $.data + - marshal: + json: + library: Jackson + module-class-names: com.fasterxml.jackson.datatype.jsr310.JavaTimeModule + - to: "kamelet:sink" +