Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a15ac64
NIFI-6197 HBase Client Service uses a bad default for retries
lfrancke Apr 8, 2019
cded30b
NIFI-5953 Manage GetTwitter connection retries on '420 Enhance Your C…
k0urge Jan 28, 2019
49e74b2
NIFI-6198 Infinite recursion in HBase*ClientService
lfrancke Apr 8, 2019
38db4e9
NIFI-5464: Consider ports invalid unless they have both incoming and …
markap14 Apr 8, 2019
bf7e70e
NIFI-6194: Fixed verification logic to determine whether or not a var…
markap14 Apr 9, 2019
0f10010
NIFI-6199: Fixed problematic unit tests that did not properly wait fo…
markap14 Apr 9, 2019
fadf8ce
NIFI-6185: ListDatabaseTables processor doesn't close ResultSets
lfrancke Apr 3, 2019
07d650e
NIFI-5338: Added SimpleScriptedLookupService controller service
mattyb149 Apr 1, 2019
835b8c6
NIFI-6126 - throttle requests to refresh the canvas to prevent too ma…
rfellows Mar 27, 2019
1a222f8
NIFI-6186 Resolve handling of module paths in JythonScriptEngineConfi…
ambah Apr 3, 2019
980e4a8
NIFI-6016 PutCassandraRecord batch size
dnsbtchr Feb 26, 2019
1c754ab
NIFI-6140 - Incorrect title on the read only version of the Reporting…
rfellows Mar 22, 2019
72ba7be
NIFI-6191 HBase 2 ClientService documentation refers to HBase 1.x
lfrancke Apr 4, 2019
6a32afc
NIFI-6172 Fixed a bug that caused ElasticSearchStringLookupService to…
MikeThomsen Apr 2, 2019
f9e73ac
NIFI-6188 - Added 1s delay on Search control autocomplete query event…
aeaversa Apr 4, 2019
9a63fa6
NIFI-6181 FetchSFTP and FetchFTP File Not Found fix
bdesert Apr 4, 2019
f4b74c1
NIFI-6177 Refactor HBaseListLookupService tests to remove use of Test…
bbende Apr 2, 2019
34c9308
NIFI-5979 : enhanced ReplaceText processor with "Number of Occurrence…
pushpavanthar Mar 17, 2019
aa70a52
NIFI-5979 Add Line-by-Line Evaluation Mode to ReplaceText
ijokarumawak Apr 8, 2019
4076103
Update WriteResourceToStream.java
aleksi75 Mar 21, 2019
e860347
NIFI-6184: This closes #3413. Removed unnecessary test-jar dep from n…
mattyb149 Apr 6, 2019
7a9b362
NIFI-6136 - fix: UI - User and Template bug when page popped out
rfellows Mar 26, 2019
1cc6079
NIFI-5953 Manage GetTwitter connection retries on '420 Enhance Your C…
k0urge Jan 28, 2019
857cb05
NIFI-6198 Infinite recursion in HBase*ClientService
lfrancke Apr 8, 2019
c797a97
NIFI-5464: Consider ports invalid unless they have both incoming and …
markap14 Apr 8, 2019
3c17005
NIFI-6194: Fixed verification logic to determine whether or not a var…
markap14 Apr 9, 2019
38b908f
NIFI-6199: Fixed problematic unit tests that did not properly wait fo…
markap14 Apr 9, 2019
58c2b35
NIFI-6197 Fix deprecations, default for client retries and variable v…
lfrancke Apr 9, 2019
8b2e327
NIFI-6197 Fix tests
lfrancke Apr 9, 2019
7879130
NIFI-6197 HBase Client Service uses a bad default for retries
lfrancke Apr 8, 2019
aa5d9b5
NIFI-6197 Fix deprecations, default for client retries and variable v…
lfrancke Apr 9, 2019
3204b40
NIFI-6197 Fix tests
lfrancke Apr 9, 2019
b40165f
Merge branch 'NIFI-6197' of https://github.com/lfrancke/nifi into NIF…
lfrancke Apr 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,28 @@ public LocalPort(final String id, final String name, final ProcessGroup processG

@Override
public boolean isValid() {
return !getConnections(Relationship.ANONYMOUS).isEmpty();
return !getConnections(Relationship.ANONYMOUS).isEmpty() && hasIncomingConnection();
}

@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
if (!isValid()) {
final ValidationResult error = new ValidationResult.Builder()
.explanation(String.format("Output connection for port '%s' is not defined.", getName()))
.subject(String.format("Port '%s'", getName()))
.valid(false)
.build();
validationErrors.add(error);
if (getConnections(Relationship.ANONYMOUS).isEmpty()) {
validationErrors.add(new ValidationResult.Builder()
.explanation("Port has no outgoing connections")
.subject(String.format("Port '%s'", getName()))
.valid(false)
.build());
}

if (!hasIncomingConnection()) {
validationErrors.add(new ValidationResult.Builder()
.explanation("Port has no incoming connections")
.subject(String.format("Port '%s'", getName()))
.valid(false)
.build());
}

return validationErrors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void disable() {
writeLock.lock();
try {
state = ControllerServiceState.DISABLED;
disabledFutures.stream().forEach(future -> future.complete(null));
disabledFutures.forEach(future -> future.complete(null));
} finally {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2849,33 +2849,53 @@ public void verifyCanUpdateVariables(final Map<String, String> updatedVariables)
return;
}

for (final ProcessorNode processor : findAllProcessors()) {
// Determine any Processors that references the variable
for (final ProcessorNode processor : getProcessors()) {
if (!processor.isRunning()) {
continue;
}

for (final String variableName : updatedVariableNames) {
for (final VariableImpact impact : getVariableImpact(processor)) {
for (final VariableImpact impact : getVariableImpact(processor)) {
for (final String variableName : updatedVariableNames) {
if (impact.isImpacted(variableName)) {
throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + processor + ", which is currently running");
}
}
}
}

for (final ControllerServiceNode service : findAllControllerServices()) {
// Determine any Controller Service that references the variable.
for (final ControllerServiceNode service : getControllerServices(false)) {
if (!service.isActive()) {
continue;
}

for (final String variableName : updatedVariableNames) {
for (final VariableImpact impact : getVariableImpact(service)) {
for (final VariableImpact impact : getVariableImpact(service)) {
for (final String variableName : updatedVariableNames) {
if (impact.isImpacted(variableName)) {
throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + service + ", which is currently running");
}
}
}
}

// For any child Process Group that does not override the variable, also include its references.
// If a child group has a value for the same variable, though, then that means that the child group
// is overriding the variable and its components are actually referencing a different variable.
for (final ProcessGroup childGroup : getProcessGroups()) {
for (final String variableName : updatedVariableNames) {
final ComponentVariableRegistry childRegistry = childGroup.getVariableRegistry();
final VariableDescriptor descriptor = childRegistry.getVariableKey(variableName);
final boolean overridden = childRegistry.getVariableMap().containsKey(descriptor);
if (!overridden) {
final Set<ComponentNode> affectedComponents = childGroup.getComponentsAffectedByVariable(variableName);
if (!affectedComponents.isEmpty()) {
throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponents.size() + " components that are " +
"currently running.");
}
}
}
}
} finally {
readLock.unlock();
}
Expand Down Expand Up @@ -2987,8 +3007,8 @@ public void setVariables(final Map<String, String> variables) {
}

final Map<VariableDescriptor, String> variableMap = new HashMap<>();
variables.entrySet() // cannot use Collectors.toMap because value may be null
.forEach(entry -> variableMap.put(new VariableDescriptor(entry.getKey()), entry.getValue()));
// cannot use Collectors.toMap because value may be null
variables.forEach((key, value) -> variableMap.put(new VariableDescriptor(key), value));

variableRegistry.setVariables(variableMap);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -246,7 +247,7 @@ public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedExce
}

@Test(timeout = 60000)
public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException {
public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException, ExecutionException {
final String uuid = UUID.randomUUID().toString();
final Processor proc = new ServiceReferencingProcessor();
proc.initialize(new StandardProcessorInitializationContext(uuid, null, null, null, KerberosConfig.NOT_CONFIGURED));
Expand Down Expand Up @@ -280,13 +281,8 @@ public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() thro
scheduler.stopProcessor(procNode);
assertTrue(service.isActive());
assertSame(service.getState(), ControllerServiceState.ENABLING);
scheduler.disableControllerService(service);
assertSame(service.getState(), ControllerServiceState.DISABLING);
scheduler.disableControllerService(service).get();
assertFalse(service.isActive());

while (service.getState() != ControllerServiceState.DISABLED) {
Thread.sleep(5L);
}
assertSame(service.getState(), ControllerServiceState.DISABLED);
}

Expand Down Expand Up @@ -356,7 +352,7 @@ public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
@Override
public void run() {
try {
scheduler.enableControllerService(serviceNode);
scheduler.enableControllerService(serviceNode).get();
assertTrue(serviceNode.isActive());
} catch (final Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ public class GetTwitter extends AbstractProcessor {
.allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER)
.defaultValue(ENDPOINT_SAMPLE.getValue())
.build();
public static final PropertyDescriptor MAX_CLIENT_ERROR_RETRIES = new PropertyDescriptor.Builder()
.name("max-client-error-retries")
.displayName("Max Client Error Retries")
.description("The maximum number of retries to attempt when client experience retryable connection errors."
+ " Client continues attempting to reconnect using an exponential back-off pattern until it successfully reconnects"
+ " or until it reaches the retry limit."
+" It is recommended to raise this value when client is getting rate limited by Twitter API. Default value is 5.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.build();
public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder()
.name("Consumer Key")
.description("The Consumer Key provided by Twitter")
Expand Down Expand Up @@ -161,6 +172,7 @@ public class GetTwitter extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ENDPOINT);
descriptors.add(MAX_CLIENT_ERROR_RETRIES);
descriptors.add(CONSUMER_KEY);
descriptors.add(CONSUMER_SECRET);
descriptors.add(ACCESS_TOKEN);
Expand Down Expand Up @@ -222,6 +234,7 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
@OnScheduled
public void onScheduled(final ProcessContext context) {
final String endpointName = context.getProperty(ENDPOINT).getValue();
final int maxRetries = context.getProperty(MAX_CLIENT_ERROR_RETRIES).asInteger().intValue();
final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
context.getProperty(CONSUMER_SECRET).getValue(),
context.getProperty(ACCESS_TOKEN).getValue(),
Expand Down Expand Up @@ -319,6 +332,7 @@ public void onScheduled(final ProcessContext context) {
}

clientBuilder.hosts(host).endpoint(streamingEndpoint);
clientBuilder.retries(maxRetries);
client = clientBuilder.build();
client.connect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class TestGetTwitter {
public void testLocationValidatorWithValidLocations() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
Expand All @@ -38,6 +39,7 @@ public void testLocationValidatorWithValidLocations() {
public void testLocationValidatorWithEqualLatitudes() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
Expand All @@ -50,6 +52,7 @@ public void testLocationValidatorWithEqualLatitudes() {
public void testLocationValidatorWithEqualLongitudes() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
Expand All @@ -62,6 +65,7 @@ public void testLocationValidatorWithEqualLongitudes() {
public void testLocationValidatorWithSWLatGreaterThanNELat() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
Expand All @@ -74,6 +78,7 @@ public void testLocationValidatorWithSWLatGreaterThanNELat() {
public void testLocationValidatorWithSWLonGreaterThanNELon() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,11 @@ public class ResultCell {
int qualifierLength;

long timestamp;
byte typeByte;
long sequenceId;

byte[] valueArray;
int valueOffset;
int valueLength;

byte[] tagsArray;
int tagsOffset;
int tagsLength;

public byte[] getRowArray() {
return rowArray;
}
Expand Down Expand Up @@ -122,22 +116,6 @@ public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public byte getTypeByte() {
return typeByte;
}

public void setTypeByte(byte typeByte) {
this.typeByte = typeByte;
}

public long getSequenceId() {
return sequenceId;
}

public void setSequenceId(long sequenceId) {
this.sequenceId = sequenceId;
}

public byte[] getValueArray() {
return valueArray;
}
Expand All @@ -162,27 +140,4 @@ public void setValueLength(int valueLength) {
this.valueLength = valueLength;
}

public byte[] getTagsArray() {
return tagsArray;
}

public void setTagsArray(byte[] tagsArray) {
this.tagsArray = tagsArray;
}

public int getTagsOffset() {
return tagsOffset;
}

public void setTagsOffset(int tagsOffset) {
this.tagsOffset = tagsOffset;
}

public int getTagsLength() {
return tagsLength;
}

public void setTagsLength(int tagsLength) {
this.tagsLength = tagsLength;
}
}
Loading