Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
final commit
  • Loading branch information
georgepachitariu committed Sep 21, 2014
1 parent b0c4fa3 commit 0d53e11
Show file tree
Hide file tree
Showing 18 changed files with 436 additions and 317 deletions.
12 changes: 6 additions & 6 deletions src/megastore/DBWriteOp.java
@@ -1,20 +1,20 @@
package megastore;

import java.util.List;

/**
* Created by George on 29/06/2014.
*/
public class DBWriteOp implements Runnable {

private String value;
private String key;
private List<Write> writes;
private Entity entity;
private Boolean answer;
private Boolean isWeak;

public DBWriteOp(Entity e, String key, String value, Boolean isWeak) {
public DBWriteOp(Entity e, List<Write> writes, Boolean isWeak) {
this.entity=e;
this.key=key;
this.value=value;
this.writes=writes;
this.isWeak=isWeak;
answer=null;
}
Expand All @@ -39,6 +39,6 @@ public synchronized void setAnswer(Boolean answer) {

@Override
public void run() {
entity.put(key,value, this, isWeak);
entity.put(writes, this, isWeak);
}
}
71 changes: 54 additions & 17 deletions src/megastore/Entity.java
@@ -1,6 +1,5 @@
package megastore;

import megastore.coordinator.message.InvalidateKeyMessage;
import megastore.network.ListeningThread;
import megastore.network.message.paxos_optimisation.AreYouUpToDateMessage;
import megastore.network.message.paxos_optimisation.RequestValidLogCellsMessage;
Expand Down Expand Up @@ -48,10 +47,14 @@ public void blockNextOperation() {
readyForNextWriteOperation=false;
}

public void put(String key, String value, DBWriteOp callback, boolean isWeak) {

public void put(List<Write> writes, DBWriteOp callback, boolean isWeak) {
ValidLogCell cell = createLogCell(writes);
put(cell,callback,isWeak);
}

private void put(ValidLogCell cell, DBWriteOp callback, boolean isWeak) {
boolean lockOnceReleased=false;
long hash=getHashValue(key);
ValidLogCell cell = createLogCell(hash, value);

int currentPosition=log.getNextPosition();
PaxosProposer proposer=new PaxosProposer(entityId, currentPosition,
Expand All @@ -75,8 +78,12 @@ public void put(String key, String value, DBWriteOp callback, boolean isWeak) {
lastPostionsLeaderURL = log.get(lastLeader).getLeaderUrl();
}

boolean leaderProposalResult = proposer.proposeValueToLeader(lastPostionsLeaderURL); //also comment this
// boolean leaderProposalResult=false; //to disable optimisation
boolean leaderProposalResult;
// if(Optimisations.Optim)
leaderProposalResult= proposer.proposeValueToLeader(lastPostionsLeaderURL);
// else
// leaderProposalResult=false; //to disable optimisation


if (leaderProposalResult) {
if(lastLeader>=1 && //<-my optimisation
Expand Down Expand Up @@ -109,12 +116,19 @@ public void put(String key, String value, DBWriteOp callback, boolean isWeak) {
callback.setAnswer( writeOperationResult );
}

private ValidLogCell createLogCell(long key, String value) {

private ValidLogCell createLogCell(List<Write> writes) {
List<WriteOperation> list = new LinkedList<WriteOperation>();
list.add(new WriteOperation(key,value));

for(Write w : writes) {
long hash=getHashValue(w.key);
list.add(new WriteOperation(hash ,w.newValue));
}

return new ValidLogCell(megastore.getCurrentUrl(), list);
}


public long getHashValue(String key) {
char[] chars=key.toCharArray();
long hash=7;
Expand Down Expand Up @@ -154,23 +168,37 @@ public String get(String key) {
}

public void reValidate() {
// catchUp();
// megastore.getCoordinator().validate(entityId);
Thread thr = new Thread(new InvalidateKeyMessage.CatchUpThread(megastore, entityId), "reValidate_Thr");
catchUp();
megastore.getCoordinator().validate(entityId); // switch-it back for last optimisation
/* Thread thr = new Thread(new InvalidateKeyMessage.CatchUpThread(megastore, entityId), "reValidate_Thr");
thr.start();
while ((!megastore.getCoordinator().isUpToDate(entityId)) && thr.isAlive()) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}*/
}

public void catchUp() {
String nodeURL = findAnUpToDateNode();
if(nodeURL !=null)
updateMissingLogCellsFrom(nodeURL);
boolean successful;
do {
successful=true;
String nodeURL = findAnUpToDateNode();
if (nodeURL != null) {
try {
updateMissingLogCellsFrom(nodeURL);
} catch (NodeNotRespondingException e) {
successful = false;
}
}
else {
// sadly there are no valid nodes alive, so we validate ourselves
// megastore.getCoordinator().validate(entityId);
}
} while (!successful);
}

private String getLocalLastValue(long hash) {
Expand All @@ -185,20 +213,29 @@ private String getLocalLastValue(long hash) {
return value;
}

public class NodeNotRespondingException extends Exception {
public NodeNotRespondingException(String message) {
super(message);
}
}

private final Object lock=new Object();
private LinkedList<LogCell> newCells;
private void updateMissingLogCellsFrom( String nodeURL) {
private void updateMissingLogCellsFrom( String nodeURL) throws NodeNotRespondingException {
synchronized (lock) {
newCells = null;
int currentSize = log.getNextPosition();
List<Integer> invalidPositions = log.getInvalidPositions();
new RequestValidLogCellsMessage(entityId, null,
megastore.getCurrentUrl(), nodeURL, invalidPositions, currentSize).send();
long init=System.currentTimeMillis();

try {
do {
if (newCells == null)
Thread.sleep(3);
if(System.currentTimeMillis()-init>100)
throw new NodeNotRespondingException("Node not responding in updateMissingLogCellsFrom");
} while (newCells == null);
} catch (InterruptedException e) {
e.printStackTrace();
Expand Down Expand Up @@ -230,7 +267,7 @@ private String findAnUpToDateNode() {
if(upToDateNode==null)
Thread.sleep(3);
} while(upToDateNode==null &&
System.currentTimeMillis()-time<200);
System.currentTimeMillis()-time<100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
15 changes: 15 additions & 0 deletions src/megastore/NetworkLatency.java
@@ -0,0 +1,15 @@
package megastore;

import java.util.Random;

/**
* Created by George on 11/08/2014.
*/
public class NetworkLatency {
static public int latency=0;
static final Random r=new Random();

public static void randomize() {
latency=Math.abs(r.nextInt())%3;
}
}
16 changes: 16 additions & 0 deletions src/megastore/Optimisations.java
@@ -0,0 +1,16 @@
package megastore;

/**
* Created by George on 11/08/2014.
*/
public class Optimisations {
static public boolean Optim=false;

public static void turnOff() {
Optim=false;
}

public static void turnOn() {
Optim=true;
}
}
17 changes: 17 additions & 0 deletions src/megastore/Write.java
@@ -0,0 +1,17 @@
package megastore;

/**
* Created by George on 19/07/2014.
*/
public class Write {
public String key;
public String newValue;
public long creationTimeStamp;

public Write(String key, String newValue) {
this.key = key;
this.newValue = newValue;
this.creationTimeStamp=System.currentTimeMillis();
}

}
5 changes: 3 additions & 2 deletions src/megastore/coordinator/message/InvalidateKeyMessage.java
Expand Up @@ -42,8 +42,9 @@ public void act(String[] messageParts) {
Megastore megastore = networkManager.getMegastore();
if(networkManager.getMegastore().getCoordinator().isUpToDate(entityID)) {
megastore.invalidate(entityID);
//Runnable r = new CatchUpThread(megastore, entityID); //
//new Thread(r).start(); //Activation point for the early catch-up optimisation
Runnable r = new CatchUpThread(megastore, entityID); //
// if(Optimisations.Optim)
// new Thread(r).start(); //Activation point for the early catch-up optimisation
}
else {
megastore.invalidate(entityID);
Expand Down
2 changes: 1 addition & 1 deletion src/megastore/network/ListeningThread.java
Expand Up @@ -114,7 +114,7 @@ public void run() {
Socket clientSocket=null;
try {
clientSocket = serverSocket.accept();
// SystemLog.add(new systemlog.NetworkMessage(getCurrentUrl()));
//SystemLog.add(new systemlog.NetworkMessage(getCurrentUrl()));

Thread worker=new Thread(new MessageGatherer(clientSocket, this),"MessageReaderFromNetwork");
workersList.add(worker);
Expand Down
8 changes: 5 additions & 3 deletions src/megastore/network/MessageGatherer.java
@@ -1,5 +1,6 @@
package megastore.network;

import megastore.NetworkLatency;
import systemlog.LogBuffer;

import java.io.IOException;
Expand All @@ -22,12 +23,10 @@ public MessageGatherer(Socket clientSocket, ListeningThread listeningThread) {
@Override
public void run() {
try {

////////////////////////////////////////////////
Thread.sleep(30);
Thread.sleep(NetworkLatency.latency);
/////////////////////////////////////////////


InputStream inputStream = clientSocket.getInputStream();

byte []all=new byte[0];
Expand All @@ -43,6 +42,9 @@ public void run() {

String[] parts = message.split(",");




boolean recognized = listeningThread.treatMessage(parts);

if (!recognized)
Expand Down
1 change: 1 addition & 0 deletions src/megastore/network/message/NetworkMessage.java
Expand Up @@ -10,6 +10,7 @@ public abstract class NetworkMessage implements Runnable {

protected String destinationIP;
private int destinationPort;
public String sourceUrl;

public NetworkMessage(String destinationURL) {
if(destinationURL!=null) {
Expand Down
19 changes: 10 additions & 9 deletions src/megastore/paxos/message/phase2/EnforcedAcceptRequest.java
Expand Up @@ -12,7 +12,7 @@ public class EnforcedAcceptRequest extends PaxosProposerMessage {
private ValidLogCell value;

public EnforcedAcceptRequest( long entityId, int cellNumber, NetworkManager networkManager,
String sourceURL, String destinationURL, ValidLogCell value) {
String sourceURL, String destinationURL, ValidLogCell value) {
super(networkManager,destinationURL,entityId,cellNumber);
this.value=value;
this.sourceURL=sourceURL;
Expand All @@ -33,14 +33,15 @@ public void act(String[] messageParts) {

new EnforcedAR_Accepted(entityId, cellNumber, null, networkManager.getCurrentUrl(), source).send();

new Thread(new Runnable() {
@Override
public void run() {
networkManager.getMegastore().getEntity(entityId).
proposeValueToLeaderAgainIfThereWasOne(entityId, cellNumber);
}
}).start();

// if(Optimisations.Optim) {
// new Thread(new Runnable() {
// @Override
// public void run() { //
// networkManager.getMegastore().getEntity(entityId). // Activation Point for the first optimisation
// proposeValueToLeaderAgainIfThereWasOne(entityId, cellNumber); //
// }
// }).start(); //
// }

}
else
Expand Down
9 changes: 5 additions & 4 deletions src/megastore/write_ahead_log/Log.java
Expand Up @@ -56,7 +56,7 @@ public LogCell get(int i) {

public String getLastValueOf(long key) {
synchronized (this) {
for (int i = size - 1; i >= 0; i--) {
for (int i = size ; i >= 0; i--) {
if (logList[i] != null) {
String val = logList[i].getValue(key);
if (val != null)
Expand Down Expand Up @@ -93,14 +93,15 @@ public String toString() {

public synchronized boolean isOccupied(int cellNumber) {
if (logList[cellNumber] != null && logList[cellNumber].isValid()) {
if(System.currentTimeMillis()-firstFailedTimestamp>200) {
if(System.currentTimeMillis()-firstFailedTimestamp>31) {
failedRequests = 0;
firstFailedTimestamp=System.currentTimeMillis();
}
failedRequests++;

if(failedRequests >6) {
makeNextLogPositionToFavorOtherNodes(); //activation point of the "wait" optimisation
if(failedRequests >=2) {
// if(Optimisations.Optim)
// makeNextLogPositionToFavorOtherNodes(); //activation point of the "wait" optimisation
failedRequests=0;
firstFailedTimestamp=System.currentTimeMillis();
}
Expand Down

0 comments on commit 0d53e11

Please sign in to comment.