Skip to content

Commit

Permalink
adds a basic way of storing all messages in a swarm and a way of visu…
Browse files Browse the repository at this point in the history
…alizing that storage
  • Loading branch information
barberdt committed Nov 23, 2011
1 parent 0493b42 commit b2ec393
Show file tree
Hide file tree
Showing 6 changed files with 2,603 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Date;
import java.io.IOException;
import java.net.UnknownHostException;
import java.sql.*;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void messageRecieved(Map<String, ?> payload,
if (isTableInsertMessage(payload)) {
System.out.println("Table insert message received:");
@SuppressWarnings("unchecked")
Map<String, ?> insert = (Map<String, ?>) payload.get("table-insert");
Map<String, ?> insert = (Map<String, ?>) payload.get("table_insert");
try {
tableInsert(insert);
} catch (SQLException e) {
Expand All @@ -116,7 +117,7 @@ public void messageRecieved(Map<String, ?> payload,
} else if (isTableUpdateMessage(payload)) {
System.out.println("Table update message received:");
@SuppressWarnings("unchecked")
Map<String, ?> update = (Map<String, ?>) payload.get("table-update");
Map<String, ?> update = (Map<String, ?>) payload.get("table_update");
try {
tableUpdate(update);
} catch (SQLException e) {
Expand All @@ -125,14 +126,32 @@ public void messageRecieved(Map<String, ?> payload,
} else if (isTableSelectMessage(payload)) {
System.out.println("Table select message received:");
@SuppressWarnings("unchecked")
Map<String, ?> select = (Map<String, ?>) payload.get("table-select");
Map<String, ?> select = (Map<String, ?>) payload.get("table_select");
try {
tableSelect(select, fromSwarm, fromResource);
tableSelect(select, fromResource);
} catch (SQLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
} else if (isHistorySelectMessage(payload)) {
System.out.println("History select message received");
try {
historySelect(fromResource);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else if (!isSelectResponse(payload)){
try {
storeMessage(payload, fromResource);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
Expand All @@ -142,7 +161,7 @@ public void messageRecieved(Map<String, ?> payload,

private boolean isTableInsertMessage(Map<String, ?> payload) {
for (Map.Entry<String, ?> entry : payload.entrySet()) {
if (entry.getKey().equals("table-insert")) {
if (entry.getKey().equals("table_insert")) {
return true;
}
}
Expand All @@ -151,7 +170,7 @@ private boolean isTableInsertMessage(Map<String, ?> payload) {

private boolean isTableUpdateMessage(Map<String, ?> payload) {
for (Map.Entry<String, ?> entry : payload.entrySet()) {
if (entry.getKey().equals("table-update")) {
if (entry.getKey().equals("table_update")) {
return true;
}
}
Expand All @@ -160,13 +179,30 @@ private boolean isTableUpdateMessage(Map<String, ?> payload) {

private boolean isTableSelectMessage(Map<String, ?> payload) {
for (Map.Entry<String, ?> entry : payload.entrySet()) {
if (entry.getKey().equals("table-select")) {
if (entry.getKey().equals("table_select")) {
return true;
}
}
return false;
}

private boolean isHistorySelectMessage(Map<String, ?> payload) {
for (Map.Entry<String, ?> entry : payload.entrySet()) {
if (entry.getKey().equals("history_select")) {
return true;
}
}
return false;
}

private boolean isSelectResponse(Map<String, ?> payload) {
for (Map.Entry<String, ?> entry : payload.entrySet()) {
if (entry.getKey().equals("select_response")) {
return true;
}
}
return false;
}

// Table Management

Expand Down Expand Up @@ -244,10 +280,11 @@ private void tableUpdate(Map<String, ?> update) throws SQLException {
}

// Perform the desired select query on the given table and return the select response through the swarm
private void tableSelect(Map<String, ?> select, String fromSwarm, String fromResource) throws SQLException, IOException {
private void tableSelect(Map<String, ?> select, String fromResource) throws SQLException, IOException {
Connection conn = opendbConnection();

String selectTable = (String) select.get("table");
String selectTable = (String) select.get("table");
String selectRequestId = (String) select.get("request_id");

@SuppressWarnings("unchecked")
ArrayList<String> fields = (ArrayList<String>) select.get("select");
Expand All @@ -274,19 +311,47 @@ private void tableSelect(Map<String, ?> select, String fromSwarm, String fromRes
System.out.println("Query: " + query);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(query);
sendResponse(fields, rs, fromSwarm, fromResource);
sendResponse(fields, rs, fromResource, selectRequestId);
closedbConnection(conn);
}

private void historySelect(String fromResource) throws SQLException, IOException {
Connection conn = opendbConnection();

ArrayList<String> fields = new ArrayList<String>();
fields.add("timestamp");
fields.add("producer");
fields.add("message");

String query = "SELECT * FROM messages;";
System.out.println("Query: " + query);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(query);
sendResponse(fields, rs, fromResource, fromResource);
closedbConnection(conn);
}

private void storeMessage(Map<String, ?> payload, String fromResource) throws SQLException {
Connection conn = opendbConnection();
java.util.Date d = new Date();
String query = "INSERT INTO messages (timestamp, producer, message) VALUES (\"" + d.toString() + "\", \"" + fromResource + "\", \"" + payload.toString() + "\");";
System.out.println("Query: " + query);
Statement stmt = conn.createStatement();
stmt.execute(query);
closedbConnection(conn);
}

// Method that assists the tableSelect method
private void sendResponse(ArrayList<String> fields, ResultSet rs, String fromSwarm, String fromResource) throws SQLException, IOException {
private void sendResponse(ArrayList<String> fields, ResultSet rs, String fromResource, String selectRequestId) throws SQLException, IOException {
HashMap<String, String> begin = new HashMap<String, String>();
begin.put("select-response", "begin");
begin.put("select_response", "begin");
begin.put("to", fromResource);
begin.put("request_id", selectRequestId);

HashMap<String, String> end = new HashMap<String, String>();
end.put("select-response", "end");
end.put("select_response", "end");
end.put("to", fromResource);
end.put("request_id", selectRequestId);

swarmSesh.send(begin);
while (rs.next()) {
Expand All @@ -297,8 +362,9 @@ private void sendResponse(ArrayList<String> fields, ResultSet rs, String fromSwa
String currValue = rs.getString(i+1);
response.put(currField, currValue);
}
payload.put("select-response", response);
payload.put("to", fromResource);
payload.put("select_response", response);
payload.put("to", fromResource);
payload.put("request_id", selectRequestId);
swarmSesh.send(payload);
}
swarmSesh.send(end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.buglabs.bug.swarm.history.IHistoryManager;
import com.buglabs.bug.swarm.history.impl.SQLiteManager;

public class TestMain {
public class HistoryTest {

/**
* @param args
Expand All @@ -16,7 +16,7 @@ public class TestMain {
*/
public static void main(String[] args) throws ClassNotFoundException, UnknownHostException, IOException {
// TODO Auto-generated method stub
IHistoryManager myHM = new SQLiteManager("test", "92bfe25f4a04f2e0338ded23ae30af1e482a0709", "4bf436fbee96cacedbb087f4cd41da22bf84e470", "3eae4f23d133a9d61c1582ca66cd45191fefcc57", "/home/barberdt/test.db");
IHistoryManager myHM = new SQLiteManager("test", "92bfe25f4a04f2e0338ded23ae30af1e482a0709", "4bf436fbee96cacedbb087f4cd41da22bf84e470", "3eae4f23d133a9d61c1582ca66cd45191fefcc57", "/home/barberdt/history.db");
myHM.start();
}

Expand Down
Loading

0 comments on commit b2ec393

Please sign in to comment.