Skip to content

Commit

Permalink
Finally got the basic Node and Client communication working
Browse files Browse the repository at this point in the history
  • Loading branch information
joemirizio committed May 8, 2012
1 parent f49c3cc commit 4e3c23c
Showing 1 changed file with 68 additions and 33 deletions.
101 changes: 68 additions & 33 deletions src/NodeServer.java
Expand Up @@ -31,6 +31,8 @@ public class NodeServer extends Thread {
*/
private Node node;

private InetSocketAddress local_address;


/**
* Constructor specifying the port and the initial server list.
Expand All @@ -41,24 +43,32 @@ public NodeServer(int port, String server_list_file) {
/* Create TCP socket */
try {
this.socket = new ServerSocket(port);
this.local_address = new InetSocketAddress(this.socket.getInetAddress().getLocalHost(), this.socket.getLocalPort());
} catch (IOException e) { e.printStackTrace(); System.exit(1); }
System.out.println("===================================================");
System.out.format("\tNode Server: %s\n", this.local_address.toString());
System.out.println("===================================================");

/* Connect to other servers */
/* Connect to other node server */
this.servers = new ArrayList<InetSocketAddress>();
// Get list of servers
Scanner server_list = null;
try {
server_list = new Scanner(new File(server_list_file));
} catch (FileNotFoundException ex) { ex.printStackTrace(); System.exit(1); }
// Connect to servers and store socket connections
// Connect to servers and store addresses
System.out.println("Adding nodes from server file:");
while (server_list.hasNext()) {
String svr_host = server_list.next();
int svr_port = server_list.nextInt();
//try {
this.servers.add(new InetSocketAddress(svr_host, svr_port));
//System.out.format("[+] Connection established: %s:%d\n", svr_host, svr_port);
//} catch (IOException e) { System.out.format("[!] %s: %s:%d\n", e.getMessage(), svr_host, svr_port); }
InetSocketAddress svr = new InetSocketAddress(svr_host, svr_port);

if (!this.isSameAddress(this.local_address, svr)) {
this.servers.add(svr);
System.out.format("[+] Added node: %s\n", svr.toString());
}
}
System.out.println("---------------------------------------------------");

/* Initialize node */
/*this.node = new Node();
Expand All @@ -71,51 +81,57 @@ public NodeServer(int port, String server_list_file) {
* Processes the requests.
*/
public void run() {
System.out.println("Waiting for requests...");
while (true) {
// Accept new TCP connection
Socket connection = null;
try { connection = this.socket.accept(); }
catch (IOException e) { e.printStackTrace(); }

// Determine origin
// Determine origin (client or node)
InetSocketAddress con_addr = new InetSocketAddress(connection.getInetAddress(), connection.getPort());
boolean from_client = true;
for (InetSocketAddress addr : this.servers) {
if (addr.getAddress().equals(con_addr.getAddress()) && (addr.getPort() == con_addr.getPort())) {
//if (this.isSameAddress(addr, con_addr)) {
// @TODO Research if this is the best way to determine origin
if (addr.getAddress().equals(con_addr.getAddress())) {
from_client = false; break;
}
}

// Process request
if (from_client) {
new ClientRequest(connection, this.node);
new ClientRequest(connection, this);
} else {
new NodeRequest(connection, this.node);
new NodeRequest(connection, this);
}
}
}

/**
* Adds a MusicObject to the node and updates all connected nodes.
* @param song The song to be added.
* Determines if two address are the same
* @param addr1 The first address
* @param addr2 The second address
* @return True if the IP address and port are the same
*/
private void addMusicObject(MusicObject song) {
this.node.addMusicObject(song);
// @TODO Send updates to all nodes (possibly do a consensus check)
public boolean isSameAddress(InetSocketAddress addr1, InetSocketAddress addr2) {
return addr2.getAddress().equals(addr1.getAddress()) && (addr1.getPort() == addr2.getPort());
}

/**
* Queries all nodes and returns the appropriate dataset.
* @param query The query.
* @return The list of all appropriate MusicOjects.
* Gets the list of all other server node addresses
* @return The list of all other server node addresses
*/
private List<MusicObject> queryAll(String query) {
List<MusicObject> results = new ArrayList<MusicObject>();//this.node.query(query);
for (InetSocketAddress sock : this.servers) {
// @TODO Send GET to all other nodes, receive List<MusicObject> through OOS
}
// @TODO Perform consensus

// @TODO Limit and return results
return results;
public List<InetSocketAddress> getServers() {
return this.servers;
}

/**
* Gets the local address of the server
* @return The local socket address
*/
public InetSocketAddress getLocalAddress() {
return this.local_address;
}

/**
Expand All @@ -128,16 +144,17 @@ public static void main(String[] args) {
String server_list_file = (args.length > 1) ? args[1] : DEFAULT_SERVER_LIST;
new NodeServer(port, server_list_file);
}


}


class ClientRequest extends Thread {
Socket socket;
NodeServer svr;

public ClientRequest(Socket connection, Node n) {
System.out.format("[+] Client Connection from: %s\n", connection.getInetAddress().toString());
public ClientRequest(Socket connection, NodeServer svr) {
System.out.format("\n[C] Client request from: %s\n", connection.getInetAddress().toString());
this.socket = connection;
this.svr = svr;
this.start();
}

Expand All @@ -146,17 +163,35 @@ public void run() {
DataInputStream dis = new DataInputStream(this.socket.getInputStream());
String request = dis.readUTF();
System.out.println(request);

for (InetSocketAddress addr : svr.getServers()) {
Socket s = new Socket();
try {
s.connect(addr, NodeServer.SERVER_CONNECT_TIMEOUT);
//s = new Socket(addr.getAddress(), addr.getPort(), svr.getLocalAddress().getAddress(), svr.getLocalAddress().getPort());
} catch (ConnectException e) {
System.err.format("[!] Connection failed: %s\n", addr.toString());
continue;
}
DataOutputStream dos = new DataOutputStream(s.getOutputStream());
// @TODO Determine correct action based on request
dos.writeUTF(request);
// @TODO Perform consensus
}
} catch (IOException e) { e.printStackTrace(); }
}
}


class NodeRequest extends Thread {
//ForkJoinPool fjp;
Socket socket;
NodeServer svr;

public NodeRequest(Socket connection, Node n) {
System.out.format("[+] Server Connection from: %s\n", connection.getInetAddress().toString());
public NodeRequest(Socket connection, NodeServer svr) {
System.out.format("\n[N] Node request from: %s\n", connection.getInetAddress().toString());
this.socket = connection;
this.svr = svr;
this.start();
}

Expand Down

0 comments on commit 4e3c23c

Please sign in to comment.