/
OrientDbSimpleTransact.java
129 lines (101 loc) · 3.72 KB
/
OrientDbSimpleTransact.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package net.nosql_bench;
import com.orientechnologies.orient.core.db.document.ODatabaseDocument;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.metadata.schema.OClass;
import com.orientechnologies.orient.core.metadata.schema.OSchema;
import com.orientechnologies.orient.core.metadata.schema.OType;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.tx.OTransaction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class OrientDbSimpleTransact {
private static final AtomicInteger verifyCounter = new AtomicInteger(0);
private static final AtomicInteger collisions = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ODatabaseDocument db = new ODatabaseDocumentTx("remote:localhost/test")
.open("admin", "admin");
OSchema schema = db.getMetadata().getSchema();
System.out.println("MVCC: " + db.isMVCC());
String tableName = "Counter";
int threads = 5;
int repeat = 20;
// register class
OClass cls = schema.getOrCreateClass(tableName);
String fieldName = "number";
if (!cls.existsProperty(fieldName)) {
cls.createProperty(fieldName, OType.INTEGER);
}
// create counter entity
ODocument doc = new ODocument(tableName);
doc.field(fieldName, 0);
ORID orid = db.save(doc).getIdentity();
// starting threads
ExecutorService executor = Executors.newFixedThreadPool(100);
Collection<Callable<Void>> tasks = new ArrayList<>();
for (int n = 1; n <= threads; n++) {
int delta = 1;
tasks.add(new CounterIncrement(orid, repeat));
System.out.println("Added task:" + n + " delta:" + delta);
}
executor.invokeAll(tasks);
executor.shutdown();
// load the counter again
ODocument res = db.load(orid, null, true);
System.out.println("Counter updates: " + res.toMap().get(fieldName));
System.out.println("Verify counter: " + verifyCounter);
System.out.println("Collisions: " + collisions);
if (res.toMap().get(fieldName) != verifyCounter) {
System.out.println("Error: number of updates (" +
res.toMap().get(fieldName) + ") is not equal to verify counter (" + verifyCounter + ").");
}
db.close();
}
public static class CounterIncrement implements Callable<Void> {
public CounterIncrement(ORID key, int repeat) {
this.key = key;
this.repeat = repeat;
}
private ORID key;
private int repeat;
/**
* Transactionally increments the counter
*/
@Override
public Void call() throws Exception {
ODatabaseDocumentTx db = new ODatabaseDocumentTx("remote:localhost/test")
.open("admin", "admin");
while (repeat != 0) {
try {
db.begin(OTransaction.TXTYPE.OPTIMISTIC);
ODocument entity = db.load(key, null, true);
if (entity != null) {
// increment field 'number' by one
int val = entity.field("number");
val++;
entity.field("number", val);
db.save(entity);
db.commit();
verifyCounter.addAndGet(1);
// repeat variable is only decreased when transaction is committed successfully
repeat--;
System.out.println("Updated entity:" + key + " number:" + entity.field("number"));
} else {
System.out.println("transact " + Thread.currentThread().getName() + " Not found! key=" + key);
db.rollback();
}
} catch (RuntimeException re) {
collisions.addAndGet(1);
System.out.println("Collision: " + re.getMessage());
db.rollback();
}
}
db.close();
return null;
}
}
}