Skip to content

Commit

Permalink
NIFI-5922: Ensure that we import any default variable values on flow …
Browse files Browse the repository at this point in the history
…import
  • Loading branch information
markap14 committed May 21, 2019
1 parent 650c6aa commit 8245bc3
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ public File getKerberosServiceKeytab() {

public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,18 +403,26 @@ public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args, fina
args.getAsJsonArray(FAILUREPORTS).forEach(port ->failurePorts.add(port.getAsString()));
}

final SSLContext sslContext = getSSLContext(args);
final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion);

final Map<VariableDescriptor, String> inputVariables = new HashMap<>();
final VersionedProcessGroup versionedGroup = snapshot.getFlowContents();
if (versionedGroup != null) {
for (final Map.Entry<String, String> entry : versionedGroup.getVariables().entrySet()) {
final String variableName = entry.getKey();
final String variableValue = entry.getValue();
inputVariables.put(new VariableDescriptor(variableName), variableValue);
}
}

if (args.has(VARIABLES)) {
final JsonElement variablesElement = args.get(VARIABLES);
final JsonObject variablesObject = variablesElement.getAsJsonObject();
variablesObject.entrySet()
.forEach(entry ->inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString()));
.forEach(entry -> inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString()));
}

final SSLContext sslContext = getSSLContext(args);

final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion);
final ExtensionManager extensionManager = ExtensionDiscovery.discover(narWorkingDir, systemClassLoader);

final StatelessFlow flow = new StatelessFlow(snapshot.getFlowContents(), extensionManager, () -> inputVariables, failurePorts, materializeContent, sslContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class StatelessValidationContext implements ValidationContext {
private final VariableRegistry variableRegistry;
private final StatelessProcessContext processContext;

public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager, final VariableRegistry variableRegistry) {
public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager,
final VariableRegistry variableRegistry) {
this.processContext = processContext;
this.lookup = lookup;
this.stateManager = stateManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.LinkedList;
import java.util.Queue;

public class Program {

Expand Down Expand Up @@ -168,7 +169,8 @@ private static void printUsage() {
System.out.println();
System.out.println("Notes:");
System.out.println(" 1) The configuration file must be in JSON format. ");
System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + ", " + StatelessFlow.FLOWID + ".");
System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID
+ ", " + StatelessFlow.FLOWID + ".");
System.out.println(" All other attributes will be passed to the flow using the variable registry interface");
System.out.println();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@
import org.apache.nifi.stateless.bootstrap.RunnableFlow;
import org.apache.nifi.stateless.core.StatelessFlow;

import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void testScenario1_Test() throws Exception {
///////////////////////////////////////////
// Build Flow
///////////////////////////////////////////
StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry, materializeData, ClassLoader.getSystemClassLoader());
StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry,
materializeData, ClassLoader.getSystemClassLoader());
getFile.setProperty(GetFile.DIRECTORY,"/tmp/nifistateless/input/");
getFile.setProperty(GetFile.FILE_FILTER,"test.txt");
getFile.setProperty(GetFile.KEEP_SOURCE_FILE,"true");
Expand Down

0 comments on commit 8245bc3

Please sign in to comment.