Skip to content

Commit

Permalink
Replaced tabs with spaces in infrastructure.communication
Browse files Browse the repository at this point in the history
  • Loading branch information
ielashi committed Jan 18, 2011
1 parent e516f05 commit 74e871c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 76 deletions.
40 changes: 20 additions & 20 deletions src/main/scala/infrastructure/communication/Client.scala
Expand Up @@ -16,30 +16,30 @@ class Client(val id:Int, val document:Document, val connectionType:String) {

def logPrefix = "[Client " + id + "] "

/**
* Applies the provided patches to the document shadow of the client with
/**
* Applies the provided patches to the document shadow of the client with
* the id specified. It also verifies, through the checksum, that the patch
* has been applied successfully.
*/
*/
def patch(patch:String, checksum:String):Boolean = {
if (patch != "") {
val patchObjects = Client.diffPatch.patch_fromText(patch)
if (patch != "") {
val patchObjects = Client.diffPatch.patch_fromText(patch)

val result = Client.diffPatch.patch_apply(patchObjects, shadow)
val result = Client.diffPatch.patch_apply(patchObjects, shadow)

val patchedDocument = result(0).asInstanceOf[String]
val patchedDocument = result(0).asInstanceOf[String]

shadow = patchedDocument
val resultChecksum = DigestUtils.shaHex(shadow)
shadow = patchedDocument
val resultChecksum = DigestUtils.shaHex(shadow)

Log.info(logPrefix + "Document digest: " + resultChecksum)
Log.info(logPrefix + "Document digest: " + resultChecksum)

return (resultChecksum == checksum)
} else {
return true
}
}
return (resultChecksum == checksum)
} else {
return true
}
}

/**
* In case the client goes out of sync with the server, revert the client's
Expand All @@ -52,8 +52,8 @@ class Client(val id:Int, val document:Document, val connectionType:String) {
}

def sendDocument(connection:INonBlockingConnection) = {
Log.info(logPrefix + "Sending document")
val request = new ConnectionRequest(id, document.text, connectionType)
Log.info(logPrefix + "Sending document")
val request = new ConnectionRequest(id, document.text, connectionType)
Server send("c," + request.toJson, connection, connectionType)
}

Expand All @@ -66,12 +66,12 @@ object Client {

private val logPrefix = "[Client Manager] "

private val diffPatch = new diff_match_patch() // for diff and patch
private val diffPatch = new diff_match_patch() // for diff and patch

val cleanupMessage = "cleanup clients"

// If clients lost pulse for more than this time, they will be removed
val cleanupInterval = 5 * 60 * 1000
val cleanupInterval = 5 * 60 * 1000

def client(i: Int):Option[Client] = {
if (clients.contains(i)) {
Expand Down
Expand Up @@ -33,8 +33,8 @@ class DataReceiver extends IDataHandler {
Message.fromJson(messageData) match {
case Some(message) => Server ! (message, nbc)
case _ => {
Log.error(logPrefix + "unrecognized message: " + messageData)
}
Log.error(logPrefix + "unrecognized message: " + messageData)
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/infrastructure/communication/Message.scala
Expand Up @@ -20,25 +20,25 @@ class Message(val senderId:Int, val documentName:String, val patchData:String,

object Message {
val jsonBuilder = new Gson() // used to convert objects to JSON
val jsonParser = new JSONParser
val jsonParser = new JSONParser

val logPrefix = "[Message] " // for logging

def fromJson(jsonData : String):Option[Message] = {
try {
deserialize(jsonParser.parse(jsonData).asInstanceOf[JSONObject])
deserialize(jsonParser.parse(jsonData).asInstanceOf[JSONObject])
} catch {
case _ => return None
}
}
}
}

def deserialize(o:JSONObject):Option[Message] = {
def deserialize(o:JSONObject):Option[Message] = {
var patchData:String = ""; var checksum:String = "";

if (o.get("patchData") != null) patchData = o.get("patchData").toString
if (o.get("checksum") != null) checksum = o.get("checksum").toString
if (o.get("patchData") != null) patchData = o.get("patchData").toString
if (o.get("checksum") != null) checksum = o.get("checksum").toString

Some(new Message(o.get("senderId").toString.toInt,
Some(new Message(o.get("senderId").toString.toInt,
o.get("documentName").toString, patchData, checksum,
o.get("conType").toString))
}
Expand Down
92 changes: 46 additions & 46 deletions src/main/scala/infrastructure/communication/Server.scala
Expand Up @@ -14,8 +14,8 @@ import util.Log

object Server extends Actor {
private val id = 0
private val diffPatch = new diff_match_patch() // for diff and patch
private val logPrefix = "[Server] " // for logging
private val diffPatch = new diff_match_patch() // for diff and patch
private val logPrefix = "[Server] " // for logging
private val jsonParser = new JSONParser
private var socketConnection:IServer = null

Expand All @@ -29,7 +29,7 @@ object Server extends Actor {
// Start a socket connection. All incoming data is sent to DataReceiver
socketConnection = new org.xsocket.connection.Server(port, new DataReceiver)
socketConnection run()
shutdownServer
shutdownServer
}


Expand All @@ -44,7 +44,7 @@ object Server extends Actor {
}
case Client.cleanupMessage => Client cleanup
case Document.cleanupMessage => Document cleanup
case (connectionData: String, connection: INonBlockingConnection) => {
case (connectionData: String, connection: INonBlockingConnection) => {
connectClient(connectionData, connection)
}
case _ => Log.error(logPrefix + "unrecognized message received")
Expand All @@ -69,7 +69,7 @@ object Server extends Actor {

Log.info(logPrefix + "server document digest: "
+ DigestUtils.shaHex(document(documentName) text))
}
}

/**
* Handle messages from clients.
Expand All @@ -81,22 +81,22 @@ object Server extends Actor {
connection: INonBlockingConnection):Unit = {
Client client(clientMessage.senderId) match {
case Some(client) => {
if (clientMessage.patchData != "") {
// Patch server shadow of the client
if (clientMessage.patchData != "") {
// Patch server shadow of the client
val clientPatchedSuccessfully = client patch(clientMessage.patchData,
clientMessage.checksum)
if (!clientPatchedSuccessfully) {
if (!clientPatchedSuccessfully) {
Log.error(logPrefix + "oh oh! wrong checksum. panicking...")
Log.error("Client document: " + client.shadow)
client.refresh(connection)
return
}
}

// Patch server document
patchServer(clientMessage.documentName, clientMessage.patchData)
}
// Patch server document
patchServer(clientMessage.documentName, clientMessage.patchData)
}
sendReply(client, connection)
}
case _ => {
Expand All @@ -107,58 +107,58 @@ object Server extends Actor {
}
}

/**
/**
* Calculate the diffs between the client shadow and the current server
* document and send it to the client.
* Assumes that the client shadow is up to date and passed the checksum test.
*/
* Assumes that the client shadow is up to date and passed the checksum test.
*/
def sendReply(client:Client, connection:INonBlockingConnection) = {
// a snapshot of the server document
val serverDocument = new String(client.document.text)
val serverDocument = new String(client.document.text)

val diffs = diffPatch.diff_main(client shadow, serverDocument)
val diffs = diffPatch.diff_main(client shadow, serverDocument)

client shadow = serverDocument

val patchObjects = diffPatch.patch_make(diffs)
val patchObjects = diffPatch.patch_make(diffs)

val patches = diffPatch.patch_toText(patchObjects)
val patches = diffPatch.patch_toText(patchObjects)

val checksum = DigestUtils.shaHex(client shadow)
val checksum = DigestUtils.shaHex(client shadow)

val messageToSend = new Message(id, client.document.name, patches, checksum,
val messageToSend = new Message(id, client.document.name, patches, checksum,
client.connectionType)

if (messageToSend.patchData != "") {
Log.info(logPrefix + "sending to client " + client.id + ": "
if (messageToSend.patchData != "") {
Log.info(logPrefix + "sending to client " + client.id + ": "
+ messageToSend.toJson)
}
}
Server send("m," + messageToSend.toJson, connection, client.connectionType)
}
}


/**
* Send message to a client.
*/
def send(message:String, connection:INonBlockingConnection,
connectionType:String) = {
try {
connectionType match {
case "xmlsocket" => {
try {
connectionType match {
case "xmlsocket" => {
connection write(message + "\0")
}
case _ => {
connection write(message)
connection close
}
}
} catch {
case e: Exception => Log.error(e.getStackTraceString)
case _ => {
case _ => {
connection write(message)
connection close
}
}
} catch {
case e: Exception => Log.error(e.getStackTraceString)
case _ => {
Log.error("Error sending message. Disconnecting...")
connection close
}
}
}
}

def document(documentName : String) = {
Expand All @@ -169,7 +169,7 @@ object Server extends Actor {
socketConnection close
}

def connectClient(clientData:String, clientStream : INonBlockingConnection) {
def connectClient(clientData:String, clientStream : INonBlockingConnection) {
val parsedClientData =
jsonParser.parse(clientData).asInstanceOf[JSONObject];

Expand All @@ -181,16 +181,16 @@ object Server extends Actor {

if (parsedClientData.get("documentName") == null) {
Log.error("Document name not found in client's connection request")
clientStream.close
clientStream.close
return
}

val connectionType = parsedClientData.get("connectionType").toString
val connectionType = parsedClientData.get("connectionType").toString

val documentName = parsedClientData.get("documentName").toString

// Store the client connection and generate a client id
val client = Client add(document(documentName), connectionType)
// Store the client connection and generate a client id
val client = Client add(document(documentName), connectionType)

client sendDocument(clientStream)
}
Expand All @@ -200,15 +200,15 @@ object Main {
/**
* Main entry point.
*/
def main(args: Array[String]): Unit = {
def main(args: Array[String]): Unit = {
if (args.length < 1) {
println("Args: port-number")
return
}

val portNumber: Int = args(0).toInt

Server start()
Server start()
Cleanup start()

Server connect(portNumber)
Expand Down

0 comments on commit 74e871c

Please sign in to comment.