Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: elElmo/Controller
base: 492bee8451
...
head fork: elElmo/Controller
compare: 455745ab31
  • 8 commits
  • 11 files changed
  • 0 commit comments
  • 4 contributors
View
15 src/rainbow/controller/application/Action.java
@@ -0,0 +1,15 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package rainbow.controller.application;
+
+import rainbowpc.Message;
+
+/**
+ *
+ * @author WesleyLuk
+ */
+public interface Action {
+ public void execute(Message message);
+}
View
11 src/rainbow/controller/application/BruteForceEventListener.java
@@ -0,0 +1,11 @@
+
+package rainbow.controller.application;
+
+import rainbowpc.controller.messages.WorkBlockSetup;
+
+public interface BruteForceEventListener {
+
+ public void matchFound(String match);
+
+ public void workBlockComplete(WorkBlockSetup b);
+}
View
102 src/rainbow/controller/application/BruteForcer.java
@@ -0,0 +1,102 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package rainbow.controller.application;
+
+import java.math.BigInteger;
+import rainbow.scheduler.partition.AlphabetGenerator;
+import rainbow.scheduler.partition.PlaintextSpace;
+import rainbowpc.controller.messages.NewQuery;
+import rainbowpc.controller.messages.WorkBlockSetup;
+import java.security.MessageDigest;
+import java.util.Arrays;
+
+/**
+ *
+ * @author WesleyLuk
+ */
+public class BruteForcer extends Thread {
+
+ public static final int MD5LENGTH = 128 / 8;
+ NewQuery query;
+ WorkBlockSetup work;
+ PlaintextSpace space;
+ MessageDigest md;
+ byte[] targetQuery;
+ BruteForceEventListener listener;
+
+ public BruteForcer(NewQuery query, WorkBlockSetup work, BruteForceEventListener listener) {
+ this.query = query;
+ this.work = work;
+ this.listener = listener;
+ try {
+ md = MessageDigest.getInstance("MD5");
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ convertQuery();
+ }
+
+ private void convertQuery() {
+ //System.out.println("Converting " + query.getQuery());
+ BigInteger bigint = new BigInteger(query.getQuery(), 16);
+ byte[] value = bigint.toByteArray();
+ if (value.length != MD5LENGTH) {
+ byte[] temp = value;
+ value = new byte[MD5LENGTH];
+ for (int i = 1; i <= MD5LENGTH; i++) {
+ if (i >= temp.length) {
+ value[MD5LENGTH - i] = 0;
+ } else {
+ value[MD5LENGTH - i] = temp[temp.length - i];
+ }
+ }
+ }
+ //System.out.println("Done conversion value is");
+ /*
+ for (int i = 0; i < MD5LENGTH; i++) {
+ System.out.print(Integer.toString(value[i] & 0xff, 16));
+ }
+ System.out.println();
+ */
+ targetQuery = value;
+ }
+
+ public void run() {
+ String alphabet = AlphabetGenerator.generateAlphabet(AlphabetGenerator.Types.LOWER_CASE);
+ for (long blockNumber = work.getStartBlockNumber(); blockNumber < work.getEndBlockNumber(); blockNumber++) {
+ space = new PlaintextSpace(alphabet, blockNumber, work.getStringLength());
+ for (int blockIndex = 0; blockIndex < space.BLOCK_SIZE; blockIndex++) {
+ String text = space.getText(blockIndex);
+ // System.out.println(text);
+ if (text == null) {
+ break;
+ }
+ md.reset();
+ md.update(text.getBytes());
+ if (check(md.digest())) {
+ System.out.println("Found match " + text);
+ listener.matchFound(text);
+ }
+ if(interrupted()){
+ System.out.println("Work thread interrupted");
+ return;
+ }
+ }
+ System.out.println("Completed block " + blockNumber + " String length " + work.getStringLength());
+ }
+ listener.workBlockComplete(work);
+ System.out.println("Completed workblock");
+ }
+
+ public boolean check(byte[] hash) {
+ for (int i = 0; i < MD5LENGTH; i++) {
+ if (hash[i] != targetQuery[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
View
328 src/rainbow/controller/application/Controller.java 100644 → 100755
@@ -1,297 +1,95 @@
package rainbow.controller.application;
-import rainbow.controller.events.Event;
-import rainbow.controller.node.Node;
-import rainbow.controller.node.NodeLoadComparator;
-import rainbow.controller.factory.ControllerFactory;
-import rainbow.controller.workBlock.*;
-import rainbowpc.controller.*;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import rainbowpc.Message;
-import rainbowpc.RainbowException;
+import rainbowpc.controller.ControllerProtocol;
import rainbowpc.controller.messages.*;
+import rainbowpc.scheduler.messages.QueryFound;
import rainbowpc.scheduler.messages.WorkBlockComplete;
-import java.io.IOException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.TreeMap;
-import java.util.Timer;
-import java.util.PriorityQueue;
-import java.util.LinkedList;
-import java.util.Collection;
-import java.util.ArrayList;
-
-public class Controller {
- ////////////////////////////////////////////////////////////////////////////
- // Attributes
- //
- private static final int NODE_REGISTER_TIMEOUT = 5000; //5 sec
- private static final int NODE_PREALLOCATE_CAPACITY = 50;
- // maximum load before a node is removed from the candidate queue
- private static final float NODE_LOAD_LIMIT = 10;
- private static final int MAX_PARTITION_ID = 1023;
-
- public static final String TEST_ALPHA = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
- public static final int TEST_ID = 0;
- public static final int TEST_BLOCK_LENGTH = 100000000; //100 mil
-
- private ControllerProtocol protocol;
-
- private String id = "uninitialized";
- private int stringLength = 0;
- private int nextPartitionId = 0;
- private int blockLength;
- private String target;
- private String algorithm;
- private String alphabet;
- private PriorityQueue<Node> candidates =
- new PriorityQueue<Node>(NODE_PREALLOCATE_CAPACITY, new NodeLoadComparator());
- private TreeMap<String, Node> nodes = new TreeMap<String, Node>();
- private TreeMap<Integer, WorkBlockPartition> assignedPartitions =
- new TreeMap<Integer, WorkBlockPartition>();
-
- ////////////////////////////////////////////////////////////////////////////
- // Constructors
- //
- public Controller(ControllerProtocol protocol) {
- this.protocol = protocol;
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Getters
- //
- public String getId() {
- return id;
- }
-
- public int getStringLength() {
- return stringLength;
- }
-
- public String getTarget() {
- return target;
- }
-
- public String getAlgorithm() {
- return algorithm;
- }
-
- public Collection<Node> getNodes() {
- return nodes.values();
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Setters
- //
-
- public void setId(String id) {
- this.id = id;
- }
-
- public void setStringLength(int length) {
- stringLength = length;
- }
-
- public void setTarget(String target) {
- this.target = target;
- }
-
- public void setAlgorithm(String algorithm) {
- this.algorithm = algorithm;
- }
-
- public void setAlphabet(String alphabet) {
- this.alphabet = alphabet;
- }
-
- public void setBlockLength(int length) {
- this.blockLength = length;
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Partition management methods
- //
- public void assignWorkPartition(int jobId, long startBlockNumber, long endBlockNumber, int stringLength) {
- WorkBlockPartition partition = new WorkBlockPartition(
- jobId,
- getNextPartitionId(),
- startBlockNumber,
- endBlockNumber,
- alphabet,
- target,
- stringLength,
- blockLength
- );
- assignedPartitions.put(partition.getPartitionId(), partition);
- log("New partition with " + partition.getSize() + " blocks assigned to controller!");
- }
-
- private int getNextPartitionId() {
- nextPartitionId = nextPartitionId < MAX_PARTITION_ID - 1? nextPartitionId++ : 0;
- return nextPartitionId;
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Node priority queue management
- //
- public void addNode(Node node) {
- nodes.put(node.getName(), node);
- candidates.offer(node);
- log(node.getName() + " added to candidates");
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Helper methods
- //
- public void log(String msg) {
- // System.out for now
- System.out.println("[*** Controller ***]: " + msg);
- }
- public void warn(String msg) {
- System.out.println("[!!! Controller !!!]: " + msg);
- }
+public class Controller extends Thread {
- public int getNodeCount() {
- return nodes.size();
- }
+ ExecutorService executor;
+ ControllerProtocol protocol;
+ NewQuery query;
+ BruteForcer current;
+ HashMap<String, Action> mapping;
- public void distributeWork() {
- log("Handing out work...");
- for (WorkBlockPartition partition : assignedPartitions.values()) {
- distributePartitionToNodes(partition);
- log(Integer.toString(partition.getCurrentSize()));
- }
- log("Distribution round complete");
+ public Controller() {
+ this("localhost");
}
- private void distributePartitionToNodes(WorkBlockPartition partition) {
- while (!candidates.isEmpty() && partition.hasUnassignedWork()) {
- Node candidate = candidates.remove();
- float newLoad = candidate.assignBlock(partition.getNextBlock(), target);
- System.out.println(newLoad);
- if (newLoad < NODE_LOAD_LIMIT)
- candidates.offer(candidate);
- log(candidate.getName() + " assigned work!");
+ public Controller(String host) {
+ executor = Executors.newSingleThreadExecutor();
+ try {
+ protocol = new ControllerProtocol(host);
+ } catch (IOException e) {
+ System.out.println("Could not connect to scheduler, has it been started?");
+ System.exit(1);
}
+ mapping = ControllerMappingFactory.createMapping(this);
}
- public void gracefulTerminate(String id) {
- Node node = nodes.remove(id);
- if (node != null) {
- node.kill();
- node.requeueAllAssignedWork();
-
- } else {
- warn(id + " not found in registered nodes!");
- }
+ @Override
+ public void start() {
+ super.start();
+ executor.execute(protocol);
}
- public void markBlockDone(String nodeName, int partitionId, int blockId) {
- WorkBlockPartition partition = assignedPartitions.get(partitionId);
- if (partition != null) {
- WorkBlock block = partition.markBlockComplete(blockId);
- removeNodeWorkBlock(nodeName, block);
- if (partition.isDone()) {
- sendMessageToScheduler(new WorkBlockComplete(id, new WorkBlockSetup(
- partition.getStringLength(),
- partition.getStartBlockNumber(),
- partition.getEndBlockNumber()
- )));
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Message message = protocol.getMessage();
+ Action action = mapping.get(message.getMethod());
+ action.execute(message);
+ } catch (InterruptedException ie) {
+ interrupt();
+ break;
}
}
- else {
- warn("Partition with id: " + partitionId + " not found");
- }
- }
-
- private void removeNodeWorkBlock(String nodeName, WorkBlock block) {
- Node node = nodes.get(nodeName);
- node.removeBlock(block);
- rescheduleNode(node);
- }
-
- private void rescheduleNode(Node node) {
- if (node.getLoad() < NODE_LOAD_LIMIT) {
- candidates.remove(node.getName());
- candidates.offer(node);
- }
- }
-
- public void markTargetFound(int partitionId, String reversed) {
- assignedPartitions.clear();
- for (Node node : nodes.values()) {
- node.clearAllJobs();
- rescheduleNode(node);
- }
- log("REVERSED!!!!! " + reversed);
}
+ BruteForceEventListener listener = new BruteForceEventListener() {
- public void sendMessageToScheduler(Message msg) {
- try {
- protocol.sendMessage(msg);
- } catch (IOException e) {
- protocol.shutdown();
- Thread.currentThread().interrupt();
+ @Override
+ public void matchFound(String match) {
+ try {
+ protocol.sendMessage(new QueryFound(protocol.getId(), query, match));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- }
- /////////////////////////////////////////////////////////////////////////////
- // Main loop
- //
- public void run(TreeMap<String, Event> eventMapping) {
- Executor protocolExecutor = Executors.newSingleThreadExecutor();
- protocolExecutor.execute(protocol);
- protocol.setInterruptThread(Thread.currentThread());
- while (true) {
+ @Override
+ public void workBlockComplete(WorkBlockSetup b) {
try {
- Message msg = protocol.getMessage();
- Event event = eventMapping.get(msg.getMethod());
- if (event != null) {
- event.run(msg);
- }
- else {
- warn("Message with method " + msg.getMethod() + " dropped");
- }
- }
- catch (InterruptedException e) {
- break;
+ protocol.sendMessage(new WorkBlockComplete(protocol.getId(), 0, b));
+ } catch (IOException e) {
+ e.printStackTrace();
}
- }
- try {
- protocol.shutdown();
}
- catch (Exception e) {}
- }
+ };
- ///////////////////////////////////////////////////////////////////////////////////////
- // Static main entry methods
- //
- private static boolean hasValidArguments(String[] args) {
- return args.length == 1;
+ public void bruteForce(WorkBlockSetup block) {
+ current = new BruteForcer(query, block, listener);
+ current.start();
}
- private static ControllerProtocol connectToScheduler (String host) {
- try {
- return new ControllerProtocol(host);
- }
- // we don't care what exception is raised, just that we can't connect
- catch (Exception e) {
- }
- return null;
+ @Override
+ public void interrupt() {
+ super.interrupt();
+ protocol.shutdown();
+ executor.shutdown();
}
- public static void main(String[] args) {
- if (!hasValidArguments(args)) {
- System.err.println("Controller takes the scheduler host as its only argument");
- System.exit(1);
- }
- ControllerProtocol protocol = connectToScheduler(args[0]);
- if (protocol == null) {
- System.err.println("Failed to connect o scheduler " + args[0]);
- System.exit(1);
+ public static void main(String[] s) {
+ if (s.length > 0) {
+ new Controller(s[0]).start();
+ } else {
+ new Controller().start();
}
- Controller application = new Controller(protocol);
- application.run(ControllerFactory.getDefaultMapping(application));
- System.exit(0);
}
}
View
63 src/rainbow/controller/application/ControllerMappingFactory.java
@@ -0,0 +1,63 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package rainbow.controller.application;
+
+import java.util.Currency;
+import java.util.HashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import rainbowpc.Message;
+import rainbowpc.controller.messages.ControllerBootstrapMessage;
+import rainbowpc.controller.messages.NewQuery;
+import rainbowpc.controller.messages.StopQuery;
+import rainbowpc.controller.messages.WorkBlockSetup;
+
+/**
+ *
+ * @author WesleyLuk
+ */
+public class ControllerMappingFactory {
+
+ public static HashMap<String, Action> createMapping(final Controller controller) {
+ HashMap<String, Action> mapping = new HashMap<String, Action>();
+ mapping.put(NewQuery.LABEL, new Action() {
+
+ @Override
+ public void execute(Message message) {
+ controller.query = (NewQuery) message;
+ Logger.getAnonymousLogger().log(Level.INFO, "New Query recieved");
+ }
+ });
+ mapping.put(StopQuery.LABEL, new Action() {
+
+ @Override
+ public void execute(Message message) {
+ controller.current.interrupt();
+ controller.query = null;
+ Logger.getAnonymousLogger().log(Level.INFO, "Stop Query recieved");
+ }
+ });
+ mapping.put(ControllerBootstrapMessage.LABEL, new Action() {
+
+ @Override
+ public void execute(Message message) {
+ Logger.getAnonymousLogger().log(Level.INFO, "ControllerBootstrapMessage recieved");
+ }
+ });
+ mapping.put(WorkBlockSetup.LABEL, new Action() {
+
+ @Override
+ public void execute(Message message) {
+ WorkBlockSetup workBlock = (WorkBlockSetup) message;
+ System.out.println(String.format("WorkBlockSetup recieved (%s,%s,%s)",
+ workBlock.getStartBlockNumber(),
+ workBlock.getEndBlockNumber(),
+ workBlock.getStringLength()));
+ controller.bruteForce(workBlock);
+ }
+ });
+ return mapping;
+ }
+}
View
11 src/rainbow/controller/events/Event.java
@@ -1,11 +0,0 @@
-package rainbow.controller.events;
-
-import rainbowpc.Message;
-
-public abstract class Event {
- public void run(Message msg) {
- action(msg);
- }
-
- public abstract void action(Message msg);
-}
View
80 src/rainbow/controller/factory/ControllerFactory.java
@@ -1,80 +0,0 @@
-package rainbow.controller.factory;
-
-import rainbow.controller.application.Controller;
-import rainbow.controller.events.Event;
-import rainbow.controller.node.Node;
-import rainbow.controller.workBlock.WorkBlockPartition;
-import rainbowpc.controller.messages.*;
-import rainbowpc.node.messages.*;
-import rainbowpc.Message;
-import java.util.TreeMap;
-
-public class ControllerFactory {
- public static TreeMap<String, Event> getDefaultMapping(final Controller controller) {
- TreeMap<String, Event> eventMapping = new TreeMap<String, Event>();
-
- eventMapping.put(ControllerBootstrapMessage.LABEL, new Event() {
- public void action(Message msg) {
- ControllerBootstrapMessage bootstrap = (ControllerBootstrapMessage)msg;
- controller.setId(bootstrap.id);
- controller.log("Set id to " + controller.getId());
- }
- });
-
- eventMapping.put(NewQuery.LABEL, new Event() {
- public void action(Message msg) {
- NewQuery query = (NewQuery)msg;
- controller.setTarget(query.getQuery());
- controller.log("Set target to " + controller.getTarget());
- controller.setAlgorithm(query.getHashMethod());
- controller.log("Set algorithm to " + controller.getAlgorithm());
- }
- });
-
-
- eventMapping.put(WorkBlockSetup.LABEL, new Event() {
- public void action(Message msg) {
- WorkBlockSetup setup = (WorkBlockSetup)msg;
- controller.setStringLength(setup.getStringLength());
- controller.log("Length set to " + controller.getStringLength());
- controller.setBlockLength(Controller.TEST_BLOCK_LENGTH);
- controller.setAlphabet(Controller.TEST_ALPHA);
- controller.assignWorkPartition(Controller.TEST_ID, setup.getStartBlockNumber(), setup.getEndBlockNumber(), setup.getStringLength());
- controller.distributeWork();
- }
- });
-
- eventMapping.put(NewNodeMessage.LABEL, new Event() {
- public void action(Message msg) {
- NewNodeMessage nodeMsg = (NewNodeMessage)msg;
- Node node = new Node(nodeMsg);
- controller.addNode(node);
- controller.log(node.getName() + " has joined the collective!");
- controller.log(node.getName() + " has " + node.getThreadCount() + " threads");
- controller.distributeWork();
- }
- });
-
- eventMapping.put(NodeDisconnectMessage.LABEL, new Event() {
- public void action(Message msg) {
- NodeDisconnectMessage info = (NodeDisconnectMessage)msg;
- controller.gracefulTerminate(info.getId());
- controller.log("Signalled that " + info.getId() + " has disconnected");
- }
- });
-
- eventMapping.put(WorkMessage.LABEL, new Event() {
- public void action(Message msg) {
- WorkMessage details = (WorkMessage)msg;
- if (details.targetFound()) {
- controller.markTargetFound(details.getPartitionId(), details.getReversed());
- } else {
- controller.markBlockDone(details.getNodeName(), details.getPartitionId(), details.getBlockId());
- }
- controller.distributeWork();
- }
- });
-
- return eventMapping;
- }
-}
View
100 src/rainbow/controller/node/Node.java
@@ -1,100 +0,0 @@
-package rainbow.controller.node;
-
-import rainbow.controller.workBlock.WorkBlock;
-import rainbowpc.controller.ControllerProtocolet;
-import rainbowpc.controller.messages.NewNodeMessage;
-import rainbowpc.node.messages.*;
-import java.util.ArrayList;
-import java.io.IOException;
-
-public class Node implements Comparable<Node> {
- private static final int NO_THREADS = 0;
-
- private String name;
- private int threads;
- private boolean alive; // nodes may die ay any given time, we must
- // prevent assigning further work
- private ControllerProtocolet agent;
- private ArrayList<WorkBlock> assigned;
-
- public Node(NewNodeMessage msg) {
- this(msg.getName(), msg.getCoreCount(), msg.getAgent());
- }
-
- public Node(String name, int cores, ControllerProtocolet agent) {
- this.name = name;
- threads = cores;
- this.agent = agent;
- alive = true;
- assigned = new ArrayList<WorkBlock>();
- }
-
- public int compareTo(Node node) {
- //return Float.compare(getLoad(), node.getLoad());
- return name.compareTo(node.name);
- }
-
- public boolean equals(Object o) {
- return getName() == o;
- }
-
- public int setThreads(int n) {
- if (n > NO_THREADS) {
- threads = n;
- }
- return threads;
- }
-
- public float getLoad() {
- return assigned.size()/threads;
- }
-
- public String getName() {
- return name;
- }
-
- public int getThreadCount() {
- return threads;
- }
-
- public boolean isAlive() {
- return alive;
- }
-
- public void kill() {
- alive = false;
- }
-
- public void clearAllJobs() {
- assigned.clear();
- }
-
- public void removeBlock(WorkBlock block) {
- assigned.remove(block);
- }
-
- public float assignBlock(WorkBlock block, String target) {
- try {
- agent.sendMessage(new WorkMessage(
- name,
- target,
- block.getPartitionId(),
- block.getId(),
- block.getStartBlockNumber(),
- block.getEndBlockNumber(),
- block.getStringLength()
- ));
- } catch (IOException e) {
- alive = false;
- }
- assigned.add(block);
- return getLoad();
- }
-
- public void requeueAllAssignedWork() {
- for (WorkBlock block : assigned) {
- block.getPartition().repushBlock(block.getId());
- }
- }
-
-}
View
10 src/rainbow/controller/node/NodeLoadComparator.java
@@ -1,10 +0,0 @@
-package rainbow.controller.node;
-
-import java.util.Comparator;
-
-public class NodeLoadComparator implements Comparator<Node> {
- public int compare(Node node0, Node node1) {
- // inverted because lower load should get higher priority
- return Float.compare(node1.getLoad(), node0.getLoad());
- }
-}
View
55 src/rainbow/controller/workBlock/WorkBlock.java
@@ -1,55 +0,0 @@
-package rainbow.controller.workBlock;
-
-// This is trivial at this point, but we may want to extend this in the
-// future.
-public class WorkBlock implements Comparable<WorkBlock> {
- private int id;
- private WorkBlockPartition partition;
- private long startIndex;
- private long endBlock;
-
- public WorkBlock(int id, long startIndex, long endBlock, WorkBlockPartition partition) {
- this.id = id;
- this.partition = partition;
- this.startIndex = startIndex;
- this.endBlock = endBlock;
- }
-
- public long getStartBlockNumber() {
- return startIndex;
- }
-
- public long getEndBlockNumber() {
- return endBlock;
- }
-
- public String toString() {
- return Integer.toString(id) + ": " + getStartBlockNumber();
- }
-
- public int getId() {
- return id;
- }
-
- public WorkBlockPartition getPartition() {
- return partition;
- }
-
- public int getPartitionId() {
- return partition.getPartitionId();
- }
-
- public int getStringLength() {
- return partition.getStringLength();
- }
-
- public int compareTo(WorkBlock block) {
- return Integer.compare(id, block.id);
- }
-
- public boolean equals(WorkBlock block) {
- return id == block.id &&
- startIndex == block.startIndex &&
- endBlock == block.endBlock;
- }
-}
View
171 src/rainbow/controller/workBlock/WorkBlockPartition.java
@@ -1,171 +0,0 @@
-package rainbow.controller.workBlock;
-
-import java.util.LinkedList;
-import java.util.TreeMap;
-import java.util.Collection;
-
-public class WorkBlockPartition {
- private int jobId;
- private int partitionId;
- private int size;
- private int stringLength;
- private long startBlockNumber;
- private long endBlockNumber;
- private String target;
- private LinkedList<WorkBlock> blocks;
- private TreeMap<Integer, WorkBlock> working;
-
- public WorkBlockPartition(
- int jobId,
- int partitionId,
- long startBlockNumber,
- long endBlockNumber,
- String alphabet,
- String target,
- int stringLength,
- int blockLength
- ) {
- this.jobId = jobId;
- this.partitionId = partitionId;
- this.startBlockNumber = startBlockNumber;
- this.endBlockNumber = endBlockNumber;
- this.target = target;
- this.working = new TreeMap<Integer, WorkBlock>();
- this.blocks = generateWorkBlocks(startBlockNumber, endBlockNumber, blockLength);
- this.size = (int)(endBlockNumber - startBlockNumber);
- this.stringLength = stringLength;
- }
-
- ///////////////////////////////////////////////////////////////////////////////////
- // Constructors assisting generating helper methods
- //
- private LinkedList<WorkBlock> generateWorkBlocks(long startBlockNumber, long endBlockNumber, int blockLength) {
- int blockId = 0;
- LinkedList<WorkBlock> workBlocks = new LinkedList<WorkBlock>();
- for (long i = startBlockNumber; i < endBlockNumber; i++) {
- workBlocks.add(new WorkBlock(blockId++, i, i + blockLength, this));
- }
- return workBlocks;
- }
-
-
-
- /////////////////////////////////////////////////////////////////////////////////////
- // WorkBlockParition interaction methods
- //
- public LinkedList<WorkBlock> getBlocks() {
- return blocks;
- }
-
- public Collection<WorkBlock> getWorkingBlocks() {
- return working.values();
- }
-
- public boolean isComplete() {
- return blocks.isEmpty() && working.isEmpty();
- }
-
- public WorkBlock getNextBlock() {
- if (!blocks.isEmpty()) {
- WorkBlock block = blocks.removeFirst();
- working.put(block.getId(), block);
- return block;
- }
- return null;
- }
-
- public boolean repushBlock(int jobId) {
- WorkBlock block = working.remove(jobId);
- if (block != null) {
- blocks.add(block);
- System.out.println(jobId + " requeue!");
- return true;
- }
- return false;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public int getJobId() {
- return jobId;
- }
-
- public int getStringLength() {
- return stringLength;
- }
-
- public long getStartBlockNumber() {
- return startBlockNumber;
- }
-
- public long getEndBlockNumber() {
- return endBlockNumber;
- }
-
- public int getSize() {
- return size;
- }
-
- public int getCurrentSize() {
- return getWaitingSize() + getWorkingSize();
- }
-
- public int getWaitingSize() {
- return blocks.size();
- }
-
- public int getWorkingSize() {
- return working.size();
- }
-
- public boolean isDone() {
- return getCurrentSize() == 0;
- }
-
- public boolean hasUnassignedWork() {
- return getWaitingSize() > 0;
- }
-
- public WorkBlock markBlockComplete(int blockId) {
- return working.remove(blockId);
- }
-
- /////////////////////////////////////////////////////////////////////////////////////
- // Main test method
- //
- public static void main(String[] args) {
- WorkBlockPartition test = new WorkBlockPartition(0, 0, 0, 1, "abcde", "aa", 2, 1);
- // expected output is c, d, e
- for (WorkBlock block : test.getBlocks()) {
- System.out.println(block);
- }
-
- test = new WorkBlockPartition(1, 1, 0, 999, "abcde", "hash", 10, 1);
- for (WorkBlock block : test.getBlocks()) {
- System.out.println(block);
- }
- System.out.println("Taking blocks 0, 1, 2");
- test.getNextBlock();
- test.getNextBlock();
- test.getNextBlock();
- System.out.println("Readding block 0");
- test.repushBlock(0);
- System.out.println("====== Available Blocks =======");
- for (WorkBlock block : test.getBlocks()) {
- System.out.println(block);
- }
- System.out.println("====== Working Blocks ======");
- for (WorkBlock block : test.getWorkingBlocks()) {
- System.out.println(block);
- }
- }
-}
-
-
-
-
-
-
-

No commit comments for this range

Something went wrong with that request. Please try again.