Skip to content
Permalink
Browse files Browse the repository at this point in the history
Merge pull request from GHSA-m5qf-gfmp-7638
* Remove unsafe serialization from PayloadUtil

* This code will likely be removed wholesale, but this change
  should be used as a departure point for future development
  if we end up re-implementing moveTo and friends.

* Removed vestigial MoveTo related code.

* Remove unsafe serialization in WorkSpace infra.

* Favor DataInput/DataOutputStream over ObjectInput/ObjectOutputStream
* Implement lightweight serialization in WorkBundle/WorkUnit

* Updates to WorkBundle serDe, added tests.

- set limit on number of WorkUnits per bundle. In practice these are
  commonly less than 1024.
- added null handling for WorkBundle/WorkUnit string fields.
- confirmed readUTF/writeUTF has a limit ensuring strings will
  be 65535 characters or less.

* Minor cleanup to WorkBundleTest

* Minor Change to WorkBundleTest

* Formatting updates
  • Loading branch information
drewfarris committed May 21, 2021
1 parent fd0e1c2 commit 40260b1
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 423 deletions.
7 changes: 2 additions & 5 deletions src/main/java/emissary/core/IMobileAgent.java
Expand Up @@ -3,8 +3,6 @@
import java.io.Serializable;
import java.util.List;

import emissary.server.mvc.adapters.MoveToAdapter;

/**
* Interface to the MobileAgent
*/
Expand Down Expand Up @@ -59,9 +57,8 @@ void arrive(Object payload, emissary.place.IServiceProviderPlace arrivalPlace, i
boolean isInUse();

/**
* Get the payload as an object for serialization during transport Should only be called by the MoveToAdapter
*
* @see MoveToAdapter
* Get the payload as an object for serialization during transport.
*
*/
Object getPayloadForTransport();

Expand Down
124 changes: 113 additions & 11 deletions src/main/java/emissary/pickup/WorkBundle.java
@@ -1,6 +1,8 @@
package emissary.pickup;

import java.io.Serializable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -21,13 +23,12 @@
* <p>
* getOldestFileModificationTime() &lt;= getYoungestFileModificationTime()
*/
public class WorkBundle implements Serializable, Comparable<WorkBundle> {

// Serializable
static final long serialVersionUID = 6339812801001572532L;
public final class WorkBundle implements Comparable<WorkBundle> {

private static final Logger logger = LoggerFactory.getLogger(WorkBundle.class);

static final int MAX_UNITS = 1024;

// Unique ID for this work bundle
String bundleId;

Expand Down Expand Up @@ -110,6 +111,80 @@ public WorkBundle(WorkBundle that) {
resetBundleId();
}

/**
* Deserialize a WorkBundle from a DataInputStream
*
* @param in the stream to read from
* @return the deserialized WorkBundle
* @throws IOException if there is a problem reading the stream or it contains more than <code>MAX_UNITS</code> work
* units.
*/
public static WorkBundle readFromStream(DataInputStream in) throws IOException {
WorkBundle wb = new WorkBundle();
wb.bundleId = readUTFOrNull(in);
wb.outputRoot = readUTFOrNull(in);
wb.eatPrefix = readUTFOrNull(in);
wb.caseId = readUTFOrNull(in);
wb.sentTo = readUTFOrNull(in);
wb.errorCount = in.readInt();
wb.priority = in.readInt();
wb.simpleMode = in.readBoolean();
wb.oldestFileModificationTime = in.readLong();
wb.youngestFileModificationTime = in.readLong();
wb.totalFileSize = in.readLong();
int workUnitSize = in.readInt();
if (workUnitSize > MAX_UNITS) {
throw new IOException(
"Exception when reading: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitSize + ").");
}
for (int i = 0; i < workUnitSize; i++) {
wb.addWorkUnit(WorkUnit.readFromStream(in));
}
return wb;
}

/**
* Serialize this WorkBundle to a DataOutputStream
*
* @param out the stream to write to.
* @throws IOException if there is a problem writing to the stream.
*/
public void writeToStream(DataOutputStream out) throws IOException {
writeUTFOrNull(bundleId, out);
writeUTFOrNull(outputRoot, out);
writeUTFOrNull(eatPrefix, out);
writeUTFOrNull(caseId, out);
writeUTFOrNull(sentTo, out);
out.writeInt(errorCount);
out.writeInt(priority);
out.writeBoolean(simpleMode);
out.writeLong(oldestFileModificationTime);
out.writeLong(youngestFileModificationTime);
out.writeLong(totalFileSize);
out.writeInt(workUnitList.size());
if (workUnitList.size() > MAX_UNITS) {
throw new IOException(
"Exception when writing: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitList.size() + ").");
}
for (WorkUnit u : workUnitList) {
u.writeToStream(out);
}
}

static String readUTFOrNull(DataInputStream in) throws IOException {
if (in.readBoolean()) {
return in.readUTF();
}
return null;
}

static void writeUTFOrNull(String s, DataOutputStream out) throws IOException {
out.writeBoolean(s != null);
if (s != null) {
out.writeUTF(s);
}
}

/**
* Set the work bundle id
*
Expand Down Expand Up @@ -204,8 +279,13 @@ public Iterator<WorkUnit> getWorkUnitIterator() {
*
* @param workUnit the workUnit to add
* @return number of WorkUnits in list after add
* @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
public int addWorkUnit(WorkUnit workUnit) {
if (workUnitList.size() >= MAX_UNITS) {
throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
}
workUnitList.add(workUnit);
return size();
}
Expand All @@ -215,10 +295,14 @@ public int addWorkUnit(WorkUnit workUnit) {
*
* @param workUnit the workUnit to add
* @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
* @param fileSize the size of the file added.
* @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
* @return number of files in this set after update
*/
public int addWorkUnit(WorkUnit workUnit, long fileModificationTimeInMillis, long fileSize) {
workUnitList.add(workUnit);
addWorkUnit(workUnit);

if (fileModificationTimeInMillis < oldestFileModificationTime) {
oldestFileModificationTime = fileModificationTimeInMillis;
}
Expand All @@ -231,8 +315,16 @@ public int addWorkUnit(WorkUnit workUnit, long fileModificationTimeInMillis, lon

/**
* Add from a list, without adjusting file modification time tracking.
*
* @param list a list of WorkUnits to add to this bundle
* @return the total size of WorkUnits in this bundle
* @throws IllegalStateException if adding the units would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
protected int addWorkUnits(List<WorkUnit> list) { // This appears to only be used by unit tests and the copy constructor
if (workUnitList.size() + list.size() > MAX_UNITS) {
throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
}
workUnitList.addAll(list);
return workUnitList.size();
}
Expand Down Expand Up @@ -265,18 +357,22 @@ public Iterator<String> getFileNameIterator() {
*
* @param file string file name consistent with outputRoot
* @return number of files in this set after update
* @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
public int addFileName(String file) {
workUnitList.add(new WorkUnit(file));
return size();
return addWorkUnit(new WorkUnit(file));
}

/**
* Add a file to the list
*
* @param file string file name consistent with outputRoot
* @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
* @param fileSize the size of the file being added
* @return number of files in this set after update
* @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
public int addFileName(String file, long fileModificationTimeInMillis, long fileSize) {
return addWorkUnit(new WorkUnit(file), fileModificationTimeInMillis, fileSize);
Expand All @@ -287,21 +383,27 @@ public int addFileName(String file, long fileModificationTimeInMillis, long file
*
* @param file string file names consistent with outputRoot
* @return number of files in this set after update
* @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
protected int addFileNames(String[] file) { // This appears to only be used by unit tests
for (int i = 0; file != null && i < file.length; i++) {
workUnitList.add(new WorkUnit(file[i]));
for (String f : file) {
addWorkUnit(new WorkUnit(f));
}
return size();
}

/**
* Add from a list, without adjusting file modification time tracking.
*
* @param list the list of files to add
* @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
protected int addFileNames(List<String> list) { // This appears to only be used by unit tests and the copy
// constructor
for (String file : list) {
workUnitList.add(new WorkUnit(file));
addWorkUnit(new WorkUnit(file));
}
return size();
}
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/emissary/pickup/WorkUnit.java
@@ -1,10 +1,14 @@
package emissary.pickup;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

/**
* A WorkUnit is a unit of work a worker will process. The idea is to replace fileNameList. Currently, WorkBundle is set
* to only have one file, and so there will only be one WorkUnit.
*/
public class WorkUnit {
public final class WorkUnit {
private String fileName;
private String transactionId;
// worker updates this boolean
Expand Down Expand Up @@ -36,6 +40,22 @@ public class WorkUnit {
this.failedToProcess = failedToProcess;
}

public static WorkUnit readFromStream(DataInputStream in) throws IOException {
final WorkUnit u = new WorkUnit(null);
u.fileName = WorkBundle.readUTFOrNull(in);
u.transactionId = WorkBundle.readUTFOrNull(in);
u.failedToParse = in.readBoolean();
u.failedToProcess = in.readBoolean();
return u;
}

public void writeToStream(DataOutputStream out) throws IOException {
WorkBundle.writeUTFOrNull(fileName, out);
WorkBundle.writeUTFOrNull(transactionId, out);
out.writeBoolean(failedToParse);
out.writeBoolean(failedToProcess);
}

/**
* Gets the filename for the WorkUnit
*
Expand Down

0 comments on commit 40260b1

Please sign in to comment.