Skip to content
Permalink
Browse files
IGNITE-16522 Fix usage of the JDK Marshaller in raft group listeners (#…
  • Loading branch information
SammyVimes committed Feb 11, 2022
1 parent f35d063 commit 0cf58f5c2df45f43f3bca0a74f7ebf319f6238d8
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 151 deletions.
@@ -98,8 +98,10 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
while (iter.hasNext()) {
CommandClosure<ReadCommand> clo = iter.next();

if (clo.command() instanceof GetCommand) {
GetCommand getCmd = (GetCommand) clo.command();
ReadCommand command = clo.command();

if (command instanceof GetCommand) {
GetCommand getCmd = (GetCommand) command;

Entry e;

@@ -114,8 +116,8 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
);

clo.result(resp);
} else if (clo.command() instanceof GetAllCommand) {
GetAllCommand getAllCmd = (GetAllCommand) clo.command();
} else if (command instanceof GetAllCommand) {
GetAllCommand getAllCmd = (GetAllCommand) command;

Collection<Entry> entries;

@@ -132,14 +134,14 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
}

clo.result(new MultipleEntryResponse(res));
} else if (clo.command() instanceof CursorHasNextCommand) {
CursorHasNextCommand cursorHasNextCmd = (CursorHasNextCommand) clo.command();
} else if (command instanceof CursorHasNextCommand) {
CursorHasNextCommand cursorHasNextCmd = (CursorHasNextCommand) command;

CursorMeta cursorDesc = cursors.get(cursorHasNextCmd.cursorId());

clo.result(!(cursorDesc == null) && cursorDesc.cursor().hasNext());
} else {
assert false : "Command was not found [cmd=" + clo.command() + ']';
assert false : "Command was not found [cmd=" + command + ']';
}
}
}
@@ -150,26 +152,28 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
while (iter.hasNext()) {
CommandClosure<WriteCommand> clo = iter.next();

if (clo.command() instanceof PutCommand) {
PutCommand putCmd = (PutCommand) clo.command();
WriteCommand command = clo.command();

if (command instanceof PutCommand) {
PutCommand putCmd = (PutCommand) command;

storage.put(putCmd.key(), putCmd.value());

clo.result(null);
} else if (clo.command() instanceof GetAndPutCommand) {
GetAndPutCommand getAndPutCmd = (GetAndPutCommand) clo.command();
} else if (command instanceof GetAndPutCommand) {
GetAndPutCommand getAndPutCmd = (GetAndPutCommand) command;

Entry e = storage.getAndPut(getAndPutCmd.key(), getAndPutCmd.value());

clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
} else if (clo.command() instanceof PutAllCommand) {
PutAllCommand putAllCmd = (PutAllCommand) clo.command();
} else if (command instanceof PutAllCommand) {
PutAllCommand putAllCmd = (PutAllCommand) command;

storage.putAll(putAllCmd.keys(), putAllCmd.values());

clo.result(null);
} else if (clo.command() instanceof GetAndPutAllCommand) {
GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) clo.command();
} else if (command instanceof GetAndPutAllCommand) {
GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) command;

Collection<Entry> entries = storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.vals());

@@ -180,26 +184,26 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
}

clo.result(new MultipleEntryResponse(resp));
} else if (clo.command() instanceof RemoveCommand) {
RemoveCommand rmvCmd = (RemoveCommand) clo.command();
} else if (command instanceof RemoveCommand) {
RemoveCommand rmvCmd = (RemoveCommand) command;

storage.remove(rmvCmd.key());

clo.result(null);
} else if (clo.command() instanceof GetAndRemoveCommand) {
GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) clo.command();
} else if (command instanceof GetAndRemoveCommand) {
GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) command;

Entry e = storage.getAndRemove(getAndRmvCmd.key());

clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
} else if (clo.command() instanceof RemoveAllCommand) {
RemoveAllCommand rmvAllCmd = (RemoveAllCommand) clo.command();
} else if (command instanceof RemoveAllCommand) {
RemoveAllCommand rmvAllCmd = (RemoveAllCommand) command;

storage.removeAll(rmvAllCmd.keys());

clo.result(null);
} else if (clo.command() instanceof GetAndRemoveAllCommand) {
GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) clo.command();
} else if (command instanceof GetAndRemoveAllCommand) {
GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) command;

Collection<Entry> entries = storage.getAndRemoveAll(getAndRmvAllCmd.keys());

@@ -210,8 +214,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
}

clo.result(new MultipleEntryResponse(resp));
} else if (clo.command() instanceof InvokeCommand) {
InvokeCommand cmd = (InvokeCommand) clo.command();
} else if (command instanceof InvokeCommand) {
InvokeCommand cmd = (InvokeCommand) command;

boolean res = storage.invoke(
toCondition(cmd.condition()),
@@ -220,8 +224,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
);

clo.result(res);
} else if (clo.command() instanceof RangeCommand) {
RangeCommand rangeCmd = (RangeCommand) clo.command();
} else if (command instanceof RangeCommand) {
RangeCommand rangeCmd = (RangeCommand) command;

IgniteUuid cursorId = rangeCmd.getCursorId();

@@ -239,8 +243,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
);

clo.result(cursorId);
} else if (clo.command() instanceof CursorNextCommand) {
CursorNextCommand cursorNextCmd = (CursorNextCommand) clo.command();
} else if (command instanceof CursorNextCommand) {
CursorNextCommand cursorNextCmd = (CursorNextCommand) command;

CursorMeta cursorDesc = cursors.get(cursorNextCmd.cursorId());

@@ -275,8 +279,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
} catch (NoSuchElementException e) {
clo.result(e);
}
} else if (clo.command() instanceof CursorCloseCommand) {
CursorCloseCommand cursorCloseCmd = (CursorCloseCommand) clo.command();
} else if (command instanceof CursorCloseCommand) {
CursorCloseCommand cursorCloseCmd = (CursorCloseCommand) command;

CursorMeta cursorDesc = cursors.remove(cursorCloseCmd.cursorId());

@@ -293,8 +297,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
}

clo.result(null);
} else if (clo.command() instanceof WatchRangeKeysCommand) {
WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand) clo.command();
} else if (command instanceof WatchRangeKeysCommand) {
WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand) command;

IgniteUuid cursorId = watchCmd.getCursorId();

@@ -311,8 +315,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
);

clo.result(cursorId);
} else if (clo.command() instanceof WatchExactKeysCommand) {
WatchExactKeysCommand watchCmd = (WatchExactKeysCommand) clo.command();
} else if (command instanceof WatchExactKeysCommand) {
WatchExactKeysCommand watchCmd = (WatchExactKeysCommand) command;

IgniteUuid cursorId = watchCmd.getCursorId();

@@ -328,8 +332,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
);

clo.result(cursorId);
} else if (clo.command() instanceof CursorsCloseCommand) {
CursorsCloseCommand cursorsCloseCmd = (CursorsCloseCommand) clo.command();
} else if (command instanceof CursorsCloseCommand) {
CursorsCloseCommand cursorsCloseCmd = (CursorsCloseCommand) command;

Iterator<CursorMeta> cursorsIter = cursors.values().iterator();

@@ -350,7 +354,7 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {

clo.result(null);
} else {
assert false : "Command was not found [cmd=" + clo.command() + ']';
assert false : "Command was not found [cmd=" + command + ']';
}
}
}
@@ -422,11 +422,12 @@ public boolean hasNext() {
public CommandClosure<WriteCommand> next() {
@Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) iter.done();
ByteBuffer data = iter.getData();
WriteCommand command = JDKMarshaller.DEFAULT.unmarshall(data.array());

return new CommandClosure<>() {
@Override
public WriteCommand command() {
return JDKMarshaller.DEFAULT.unmarshall(data.array());
return command;
}

@Override

0 comments on commit 0cf58f5

Please sign in to comment.