Skip to content

Commit

Permalink
external payload store
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Dec 18, 2023
1 parent ffd014c commit 1f59457
Show file tree
Hide file tree
Showing 20 changed files with 1,915 additions and 0 deletions.
44 changes: 44 additions & 0 deletions azureblob-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Azure Blob External Storage Module

This module use azure blob to store and retrieve workflows/tasks input/output payload that
went over the thresholds defined in properties named `conductor.[workflow|task].[input|output].payload.threshold.kb`.

**Warning** Azure Java SDK use libs already present inside `conductor` like `jackson` and `netty`.
You may encounter deprecated issues, or conflicts and need to adapt the code if the module is not maintained along with `conductor`.
It has only been tested with **v12.2.0**.

## Configuration

### Usage

Cf. Documentation [External Payload Storage](https://netflix.github.io/conductor/externalpayloadstorage/#azure-blob-storage)

### Example

```properties
conductor.additional.modules=com.netflix.conductor.azureblob.AzureBlobModule
es.set.netty.runtime.available.processors=false

workflow.external.payload.storage=AZURE_BLOB
workflow.external.payload.storage.azure_blob.connection_string=DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;EndpointSuffix=localhost
workflow.external.payload.storage.azure_blob.signedurlexpirationseconds=360
```

## Testing

You can use [Azurite](https://github.com/Azure/Azurite) to simulate an Azure Storage.

### Troubleshoots

* When using **es5 persistance** you will receive an `java.lang.IllegalStateException` because the Netty lib will call `setAvailableProcessors` two times. To resolve this issue you need to set the following system property

```
es.set.netty.runtime.available.processors=false
```

If you want to change the default HTTP client of azure sdk, you can use `okhttp` instead of `netty`.
For that you need to add the following [dependency](https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/storage/azure-storage-blob#default-http-client).

```
com.azure:azure-core-http-okhttp:${compatible version}
```
8 changes: 8 additions & 0 deletions azureblob-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dependencies {
compileOnly 'org.springframework.boot:spring-boot-starter'
implementation project(':conductor-common')
implementation project(':conductor-core')

implementation "com.azure:azure-storage-blob:${revAzureStorageBlobSdk}"
implementation "org.apache.commons:commons-lang3"
}
150 changes: 150 additions & 0 deletions azureblob-storage/dependencies.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"annotationProcessor": {
"org.springframework.boot:spring-boot-configuration-processor": {
"locked": "2.7.16"
}
},
"compileClasspath": {
"com.azure:azure-storage-blob": {
"locked": "12.7.0"
},
"com.netflix.conductor:conductor-common": {
"locked": "3.15.0"
},
"com.netflix.conductor:conductor-core": {
"locked": "3.15.0"
},
"org.apache.commons:commons-lang3": {
"locked": "3.12.0"
},
"org.apache.logging.log4j:log4j-api": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-core": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-jul": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-slf4j-impl": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-web": {
"locked": "2.17.2"
},
"org.springframework.boot:spring-boot-starter": {
"locked": "2.7.16"
}
},
"runtimeClasspath": {
"com.azure:azure-storage-blob": {
"locked": "12.7.0"
},
"com.netflix.conductor:conductor-common": {
"locked": "3.15.0"
},
"com.netflix.conductor:conductor-core": {
"locked": "3.15.0"
},
"org.apache.commons:commons-lang3": {
"locked": "3.12.0"
},
"org.apache.logging.log4j:log4j-api": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-core": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-jul": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-slf4j-impl": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-web": {
"locked": "2.17.2"
}
},
"testCompileClasspath": {
"com.azure:azure-storage-blob": {
"locked": "12.7.0"
},
"com.netflix.conductor:conductor-common": {
"locked": "3.15.0"
},
"com.netflix.conductor:conductor-core": {
"locked": "3.15.0"
},
"junit:junit": {
"locked": "4.13.2"
},
"org.apache.commons:commons-lang3": {
"locked": "3.12.0"
},
"org.apache.logging.log4j:log4j-api": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-core": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-jul": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-slf4j-impl": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-web": {
"locked": "2.17.2"
},
"org.junit.vintage:junit-vintage-engine": {
"locked": "5.8.2"
},
"org.springframework.boot:spring-boot-starter-log4j2": {
"locked": "2.7.16"
},
"org.springframework.boot:spring-boot-starter-test": {
"locked": "2.7.16"
}
},
"testRuntimeClasspath": {
"com.azure:azure-storage-blob": {
"locked": "12.7.0"
},
"com.netflix.conductor:conductor-common": {
"locked": "3.15.0"
},
"com.netflix.conductor:conductor-core": {
"locked": "3.15.0"
},
"junit:junit": {
"locked": "4.13.2"
},
"org.apache.commons:commons-lang3": {
"locked": "3.12.0"
},
"org.apache.logging.log4j:log4j-api": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-core": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-jul": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-slf4j-impl": {
"locked": "2.17.2"
},
"org.apache.logging.log4j:log4j-web": {
"locked": "2.17.2"
},
"org.junit.vintage:junit-vintage-engine": {
"locked": "5.8.2"
},
"org.springframework.boot:spring-boot-starter-log4j2": {
"locked": "2.7.16"
},
"org.springframework.boot:spring-boot-starter-test": {
"locked": "2.7.16"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.azureblob.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.azureblob.storage.AzureBlobPayloadStorage;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.utils.IDGenerator;

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(AzureBlobProperties.class)
@ConditionalOnProperty(name = "conductor.external-payload-storage.type", havingValue = "azureblob")
public class AzureBlobConfiguration {

@Bean
public ExternalPayloadStorage azureBlobExternalPayloadStorage(
IDGenerator idGenerator, AzureBlobProperties properties) {
return new AzureBlobPayloadStorage(idGenerator, properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.azureblob.config;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.convert.DurationUnit;

@ConfigurationProperties("conductor.external-payload-storage.azureblob")
public class AzureBlobProperties {

/** The connection string to be used to connect to Azure Blob storage */
private String connectionString = null;

/** The name of the container where the payloads will be stored */
private String containerName = "conductor-payloads";

/** The endpoint to be used to connect to Azure Blob storage */
private String endpoint = null;

/** The sas token to be used for authenticating requests */
private String sasToken = null;

/** The time for which the shared access signature is valid */
@DurationUnit(ChronoUnit.SECONDS)
private Duration signedUrlExpirationDuration = Duration.ofSeconds(5);

/** The path at which the workflow inputs will be stored */
private String workflowInputPath = "workflow/input/";

/** The path at which the workflow outputs will be stored */
private String workflowOutputPath = "workflow/output/";

/** The path at which the task inputs will be stored */
private String taskInputPath = "task/input/";

/** The path at which the task outputs will be stored */
private String taskOutputPath = "task/output/";

public String getConnectionString() {
return connectionString;
}

public void setConnectionString(String connectionString) {
this.connectionString = connectionString;
}

public String getContainerName() {
return containerName;
}

public void setContainerName(String containerName) {
this.containerName = containerName;
}

public String getEndpoint() {
return endpoint;
}

public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public String getSasToken() {
return sasToken;
}

public void setSasToken(String sasToken) {
this.sasToken = sasToken;
}

public Duration getSignedUrlExpirationDuration() {
return signedUrlExpirationDuration;
}

public void setSignedUrlExpirationDuration(Duration signedUrlExpirationDuration) {
this.signedUrlExpirationDuration = signedUrlExpirationDuration;
}

public String getWorkflowInputPath() {
return workflowInputPath;
}

public void setWorkflowInputPath(String workflowInputPath) {
this.workflowInputPath = workflowInputPath;
}

public String getWorkflowOutputPath() {
return workflowOutputPath;
}

public void setWorkflowOutputPath(String workflowOutputPath) {
this.workflowOutputPath = workflowOutputPath;
}

public String getTaskInputPath() {
return taskInputPath;
}

public void setTaskInputPath(String taskInputPath) {
this.taskInputPath = taskInputPath;
}

public String getTaskOutputPath() {
return taskOutputPath;
}

public void setTaskOutputPath(String taskOutputPath) {
this.taskOutputPath = taskOutputPath;
}
}
Loading

0 comments on commit 1f59457

Please sign in to comment.