-
Notifications
You must be signed in to change notification settings - Fork 0
/
Worker.scala
301 lines (275 loc) · 10.1 KB
/
Worker.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package net.houzuo.aurinko2
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStreamReader
import java.io.PrintWriter
import java.net.Socket
import java.util.logging.Logger
import scala.annotation.migration
import scala.collection.mutable.ListBuffer
import scala.xml.Elem
import scala.xml.NodeSeq.seqToNodeSeq
import scala.xml.XML.loadString
import org.xml.sax.SAXParseException
import net.houzuo.aurinko2.logic.Database
import net.houzuo.aurinko2.logic.Query
import scala.xml.parsing.ConstructingParser
import scala.io.Source
import scala.xml.Node
object Worker {
val LOG = Logger getLogger classOf[Worker].getName()
val MAX_REQUEST_LENGTH = 20000000 // Max request length in bytes
}
class Worker(val db: Database, val sock: Socket) {
private val in = new BufferedReader(new InputStreamReader(sock getInputStream))
private val out = new PrintWriter(sock getOutputStream, true)
// Process requests until peer closes the connection
try {
val lines = new ListBuffer[String]
var reqLength = 0
while (true)
this.synchronized {
val line = in.readLine
if (line == null)
throw new IOException("peer closed the connection")
// Accumulate input until <go /> is given, which marks end of request
val trimmed = line.trim().toLowerCase
if ("<go/>".equals(trimmed) || "<go />".equals(trimmed) || "<go></go>".equals(trimmed))
if (lines.size == 0)
respond { None }
else
try {
go(ConstructingParser.fromSource(Source.fromString(lines mkString "\n"), true).document.docElem)
} catch {
case e: SAXParseException => out println <err>Request is not a valid XML document</err>
case e: Exception =>
respond { Some(<err>{ e.getMessage }</err>) }
Worker.LOG severe s"${e.getMessage}: \n${e.getStackTraceString}"
} finally { lines.clear(); reqLength = 0 }
else {
reqLength += line.length()
if (reqLength > Worker.MAX_REQUEST_LENGTH) {
respond { Some(<err>Request is too long</err>) }
lines.clear()
reqLength = 0
} else
lines += line
}
}
} catch {
case e: IOException => Worker.LOG fine s"${sock.getRemoteSocketAddress toString} disconnected"
} finally {
try {
in.close()
} catch { case e: Exception => }
try {
out.close()
} catch { case e: Exception => }
try {
sock.close()
} catch { case e: Exception => }
}
/** Print response to output. */
private def respond(res: => Option[Elem]) {
try {
res match {
case Some(thing) => out println thing
case None =>
}
} catch {
case e: Exception =>
out println <err>{ e getMessage }</err>
e printStackTrace
} finally {
out println <ok/>
}
}
/** Process request. */
private def go(req: Node) {
req.label.toLowerCase match {
// Get all collection names
case "all" => respond { Some(<r>{ for (col <- db.all) yield <col>{ col }</col> }</r>) }
// Get IO queue length on all collections
case "load" => respond {
Some(<r>{
for (col <- db.all) yield <col name={ col }>{
for (thing <- db get (col) load) yield <queue name={ thing._1.toString }>{ thing._2 }</queue>
}</col>
}</r>)
}
// Create collection
case "create" => respond {
req attribute "col" match {
case Some(name) =>
db create name.text; None
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Rename collection
case "rename" =>
respond {
req attribute "col" match {
case Some(oldName) =>
req attribute "to" match {
case Some(newName) =>
db rename (oldName.text, newName.text); None
case None => Some(<err>Please spicify new collection in "to" attribute</err>)
}
case None => Some(<err>Please specify original collection name in "col" attribute</err>)
}
}
// Drop collection
case "drop" => respond {
req attribute "col" match {
case Some(name) =>
db drop name.text; None
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Repair collection
case "repair" => respond {
req attribute "col" match {
case Some(name) =>
db repair name.text; None
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Insert documents
case "insert" => respond {
req attribute "col" match {
case Some(colName) =>
val col = db get colName.text
Some(<r>{ col insert req.child.filter(_.isInstanceOf[Elem])(0).asInstanceOf[Elem] }</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Retrieve document by ID
case "get" => respond {
req attribute "col" match {
case Some(colName) =>
Some(<r>{
db get colName.text read {
req attribute "id" match {
case Some(id) => id.text toInt
case None => throw new Exception("Please specify ID of document to retrie")
}
}
}</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Update documents
case "update" => respond {
req attribute "col" match {
case Some(colName) =>
val col = db get colName.text
Some(<r>{
req.attribute("id") match {
case Some(oldID) =>
<old>{ oldID text }</old>
<new>{ col.update(oldID.text toInt, req.child.filter(_.isInstanceOf[Elem])(0).asInstanceOf[Elem]) get }</new>
case None => throw new Exception("Please specify ID of document to update")
}
}</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Delete documents
case "delete" => respond {
req attribute "col" match {
case Some(colName) =>
req attribute "id" match {
case Some(id) => db.get(colName text).delete(id.text toInt)
case None => throw new Exception("Please specify document ID to delete")
}
None
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Get all documents
case "findall" => respond {
req attribute "col" match {
case Some(colName) =>
val col = db.get(colName.text)
Some(<r>{
for (
docID <- req attribute ("limit") match {
case Some(number) => col.all.take(number.text toInt)
case None => col.all
}
) yield <doc id={ docID toString }>{ col.read(docID) get }</doc>
}</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Get indexed paths in collection
case "indexed" => respond {
req attribute "col" match {
case Some(colName) =>
val col = db get colName.text
Some(<r> {
for (hash <- col.hashes) yield <index type="hash" hash-bits={ hash._2._2.hashBits toString } bucket-size={ hash._2._2.perBucket toString }> {
for (pathSegment <- hash._1) yield <path>{ pathSegment }</path>
}</index>
}</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Put a hash index on collection
case "hash-index" => respond {
req attribute "col" match {
case Some(colName) =>
(db get colName.text).index(req.child.map(_.text) toList, req attribute ("hash-bits") match {
case Some(number) => number.text toInt
case None => 12
}, req attribute ("bucket-size") match {
case Some(number) => number.text toInt
case None => 100
})
None
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Drop an index
case "drop-index" => respond {
req attribute "col" match {
case Some(colName) =>
(db get colName.text) unindex (req.child.map(_.text) toList)
None
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Query and return document ID
case "q" => respond {
req attribute "col" match {
case Some(colName) => Some(<r>{
for (id <- new Query(db get colName.text) eval req) yield <id>{ id }</id>
}</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Query and return document ID and content
case "select" => respond {
req attribute "col" match {
case Some(colName) =>
val col = db get colName.text
Some(<r>{
for (id <- new Query(col) eval req) yield <doc id={ id toString }>{ col read id get }</doc>
}</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
// Query and return count of result documents
case "count" => respond {
req attribute "col" match {
case Some(colName) =>
val col = db get colName.text
Some(<r>{ new Query(col) eval req size }</r>)
case None => Some(<err>Please specify collection name in "col" attribute</err>)
}
}
case "shutdown" => System exit 0
case _ => Some(<err>Unknown command</err>)
}
}
}