Permalink
Browse files

Holy shit

  • Loading branch information...
ShinuziKyura
ShinuziKyura committed Apr 1, 2018
1 parent 0c9aecf commit 2ca69fdb5d957a9a080fcd1b0c8276282853f680
@@ -1,4 +1,4 @@
package dbs.net;
package dbs.nio.channels;
import java.net.InetAddress;
import java.util.Arrays;
@@ -1,4 +1,4 @@
package dbs.net;
package dbs.nio.channels;
import java.net.InetAddress;
import java.net.MulticastSocket;
@@ -5,24 +5,27 @@
import java.rmi.NoSuchObjectException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.Hashtable;
import java.util.regex.Pattern;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import static java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import dbs.peer.PeerUtility.ProtocolVersion;
import dbs.peer.PeerUtility.FileMetadata;
import dbs.peer.PeerUtility.ChunkMetadata;
import dbs.nio.channels.MulticastChannel;
import dbs.rmi.RemoteFunction;
import dbs.net.MulticastChannel;
import dbs.util.concurrent.LinkedTransientQueue;
import static dbs.peer.PeerUtility.METADATA_DIRECTORY;
public class Peer implements PeerInterface {
/***************************************************************************************************
@@ -65,7 +68,6 @@ public Peer(String protocol_version, int id, String access_point,
String MDB_address, int MDB_port,
String MDR_address, int MDR_port) throws IOException {
System.out.println("\nInitializing peer...");
/* Single-comment this line to switch to Cool-Mode™
NetworkInterface net_int = MainInterface.find();
if (net_int != null) {
@@ -84,33 +86,34 @@ public Peer(String protocol_version, int id, String access_point,
/*/
this.ID = Integer.toString(id);
//*/
PROTOCOL_VERSION = new ProtocolVersion(protocol_version);
ACCESS_POINT = access_point;
instances = new AtomicInteger(0);
lock = new ReentrantReadWriteLock();
condition = lock.writeLock().newCondition();
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
exclusive_access = lock.writeLock();
shared_access = lock.readLock();
/*
try (ObjectInputStream files_stream = new ObjectInputStream(new FileInputStream(METADATA_DIRECTORY + "files"));
ObjectInputStream local_chunks_stream = new ObjectInputStream(new FileInputStream(METADATA_DIRECTORY + "localchunks"));
ObjectInputStream remote_chunks_stream = new ObjectInputStream(new FileInputStream(METADATA_DIRECTORY + "remotechunks"))) {
files_metadata = (Hashtable<String, FileMetadata>) files_stream.readObject();
local_chunks_metadata = (Hashtable<String, ChunkMetadata>) local_chunks_stream.readObject();
remote_chunks_metadata = (Hashtable<String, ChunkMetadata>) remote_chunks_stream.readObject();
files_metadata = (ConcurrentHashMap<String, FileMetadata>) files_stream.readObject();
local_chunks_metadata = (ConcurrentHashMap<String, ChunkMetadata>) local_chunks_stream.readObject();
remote_chunks_metadata = (ConcurrentHashMap<String, ChunkMetadata>) remote_chunks_stream.readObject();
}
catch (IOException | ClassNotFoundException e) {
System.err.println("\nFAILURE! Couldn't load service metadata" +
"\nDistributed Backup Service terminating...");
System.exit(1);
}
/*/
files_metadata = new Hashtable<>();
local_chunks_metadata = new Hashtable<>();
remote_chunks_metadata = new Hashtable<>();
files_metadata = new ConcurrentHashMap<>();
local_chunks_metadata = new ConcurrentHashMap<>();
remote_chunks_metadata = new ConcurrentHashMap<>();
//*/
backup_messages = new Hashtable<>();
restore_messages = new Hashtable<>();
backup_messages = new ConcurrentHashMap<>();
restore_messages = new ConcurrentHashMap<>();
MCsocket = new MulticastChannel(MC_address, MC_port);
MDBsocket = new MulticastChannel(MDB_address, MDB_port);
@@ -124,6 +127,11 @@ public Peer(String protocol_version, int id, String access_point,
MDBqueue = new PeerQueue(this, MDBchannel);
MDRqueue = new PeerQueue(this, MDRchannel);
log = new PeerLog(this);
running = new AtomicBoolean(true);
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.execute(MCchannel);
executor.execute(MDBchannel);
executor.execute(MDRchannel);
@@ -132,6 +140,8 @@ public Peer(String protocol_version, int id, String access_point,
executor.execute(MDBqueue);
executor.execute(MDRqueue);
executor.execute(log);
try {
Registry registry;
try { // Like a really weird if-else-statement
@@ -163,23 +173,26 @@ public Peer(String protocol_version, int id, String access_point,
final String ACCESS_POINT;
/***************************************************************************************************
***** Member variables *****************************************************************************
***************************************************************************************************/
***** Member variables *****************************************************************************
***************************************************************************************************/
AtomicInteger instances;
private ReentrantReadWriteLock lock;
private Condition condition;
ThreadPoolExecutor executor;
WriteLock exclusive_access;
ReadLock shared_access;
Hashtable<String, FileMetadata> files_metadata;
Hashtable<String, ChunkMetadata> local_chunks_metadata;
Hashtable<String, ChunkMetadata> remote_chunks_metadata;
ConcurrentHashMap<String, FileMetadata> files_metadata;
ConcurrentHashMap<String, ChunkMetadata> local_chunks_metadata;
ConcurrentHashMap<String, ChunkMetadata> remote_chunks_metadata;
Hashtable<String, LinkedTransientQueue<byte[]>> backup_messages;
Hashtable<String, LinkedTransientQueue<byte[]>> restore_messages;
ConcurrentHashMap<String, LinkedTransientQueue<byte[]>> backup_messages;
ConcurrentHashMap<String, LinkedTransientQueue<byte[]>> restore_messages;
ConcurrentHashMap<String, LinkedTransientQueue<byte[]>> reclaim_messages;
MulticastChannel MCsocket; // multicast control
MulticastChannel MDBsocket; // multicast data backup
MulticastChannel MDRsocket; // multicast data restore
MulticastChannel MCsocket;
MulticastChannel MDBsocket;
MulticastChannel MDRsocket;
private PeerChannel MCchannel;
private PeerChannel MDBchannel;
@@ -189,20 +202,20 @@ public Peer(String protocol_version, int id, String access_point,
private PeerQueue MDBqueue;
private PeerQueue MDRqueue;
PeerLog log;
AtomicBoolean running;
ThreadPoolExecutor executor;
/***************************************************************************************************
***** Member functions *****************************************************************************
***************************************************************************************************/
public void run() {
// Subject to change
synchronized (instances) {
while (instances.get() >= 0) {
try {
instances.wait();
} catch (InterruptedException e) {
// Probably time to terminate
}
}
exclusive_access.lock();
while (running.get()) {
condition.awaitUninterruptibly();
}
System.out.println("\nTerminating peer...");
@@ -216,7 +229,7 @@ public void run() {
}
try {
UnicastRemoteObject.unexportObject(this, true);
UnicastRemoteObject.unexportObject(this, false);
}
catch (NoSuchObjectException e) {
// That's weird, shouldn't... you guessed it
@@ -231,6 +244,8 @@ public void run() {
MDBqueue.stop();
MDRqueue.stop();
log.stop();
executor.shutdown();
/*
boolean stored_metadata = true;
@@ -261,15 +276,15 @@ public void run() {
}
//*/
System.out.println("\nPeer terminated");
exclusive_access.unlock();
}
public void stop() {
while (!instances.weakCompareAndSetPlain(0, Integer.MIN_VALUE));
// Subject to change
synchronized (instances) {
instances.notifyAll();
}
exclusive_access.lock();
running.set(false);
condition.notify();
exclusive_access.unlock();
}
public String state() {
@@ -278,18 +293,38 @@ public String state() {
}
public RemoteFunction backup(String filename, String fileID, byte[] file, int replication_degree) {
return PeerProtocol.backup(this, filename, fileID, file, replication_degree);
shared_access.lock();
RemoteFunction result = (running.get() ?
PeerProtocol.backup(this, filename, fileID, file, replication_degree) :
PeerProtocol.failure());
shared_access.unlock();
return result;
}
public RemoteFunction restore(String filename) {
return PeerProtocol.restore(this, filename);
shared_access.lock();
RemoteFunction result = (running.get() ?
PeerProtocol.restore(this, filename) :
PeerProtocol.failure());
shared_access.unlock();
return result;
}
public RemoteFunction delete(String filename) {
return PeerProtocol.delete(this, filename);
exclusive_access.lock();
RemoteFunction result = (running.get() ?
PeerProtocol.delete(this, filename) :
PeerProtocol.failure());
exclusive_access.unlock();
return result;
}
public RemoteFunction reclaim(long disk_space) {
return PeerProtocol.reclaim(this, disk_space);
exclusive_access.lock();
RemoteFunction result = (running.get() ?
PeerProtocol.reclaim(this, disk_space) :
PeerProtocol.failure());
exclusive_access.unlock();
return result;
}
}
@@ -3,7 +3,7 @@
import java.io.IOException;
import java.util.concurrent.LinkedTransferQueue;
import dbs.net.MulticastChannel;
import dbs.nio.channels.MulticastChannel;
public class PeerChannel implements Runnable {
private Peer peer;
@@ -18,7 +18,7 @@
@Override
public void run() {
while (peer.instances.get() >= 0) {
while (peer.running.get()) {
try {
byte[] buffer = channel.receive();
@@ -0,0 +1,39 @@
package dbs.peer;
import java.time.Instant;
import java.util.concurrent.LinkedTransferQueue;
public class PeerLog implements Runnable {
private Peer peer;
private LinkedTransferQueue<String> queue;
PeerLog(Peer peer) {
this.peer = peer;
this.queue = new LinkedTransferQueue<>();
}
@Override
public void run() {
while (peer.running.get()) {
try {
String message = queue.take();
if (message.equals("STOP")) {
break;
}
System.out.println(message);
}
catch (InterruptedException e) {
}
}
}
void stop() {
queue.put("STOP");
}
void print(String message) {
queue.put(("\n" + Instant.now()).split("\\.")[0].replaceAll("T", " @ ") + message);
}
}
Oops, something went wrong.

0 comments on commit 2ca69fd

Please sign in to comment.