/
WeakNotificationIT.java
151 lines (121 loc) · 5.28 KB
/
WeakNotificationIT.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
public class WeakNotificationIT extends ITBaseMini {
private static final Column STAT_COUNT = new Column("stat", "count");
private static final Column STAT_CHECK = new Column("stat", "check");
public static class SimpleObserver implements Observer {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
CellScanner cellScanner = tx.scanner().over(row, new Column(Bytes.of("stats"))).build();
int sum = 0;
for (RowColumnValue rcv : cellScanner) {
sum += Integer.parseInt(rcv.getValue().toString());
tx.delete(row, rcv.getColumn());
}
if (sum != 0) {
sum += TestUtil.getOrDefault(tx, row.toString(), STAT_COUNT, 0);
tx.set(row.toString(), STAT_COUNT, sum + "");
}
}
}
public static class WeakNotificationITObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
or.forColumn(STAT_CHECK, WEAK).useObserver(new SimpleObserver());
}
}
@Override
protected Class<? extends ObserverProvider> getObserverProviderClass() {
return WeakNotificationITObserverProvider.class;
}
@Test
public void testWeakNotification() throws Exception {
Environment env = new Environment(config);
TestTransaction tx1 = new TestTransaction(env);
tx1.set("r1", STAT_COUNT, "3");
tx1.done();
TestTransaction tx2 = new TestTransaction(env);
tx2.set("r1", new Column("stats", "af89"), "5");
tx2.setWeakNotification("r1", STAT_CHECK);
tx2.done();
TestTransaction tx3 = new TestTransaction(env);
tx3.set("r1", new Column("stats", "af99"), "7");
tx3.setWeakNotification("r1", STAT_CHECK);
tx3.done();
miniFluo.waitForObservers();
TestTransaction tx4 = new TestTransaction(env);
Assert.assertEquals("15", tx4.gets("r1", STAT_COUNT));
// overlapping transactions that set a weak notification should commit w/ no problem
TestTransaction tx5 = new TestTransaction(env);
tx5.set("r1", new Column("stats", "bff7"), "11");
tx5.setWeakNotification("r1", STAT_CHECK);
CommitData cd5 = tx5.createCommitData();
Assert.assertTrue(tx5.preCommit(cd5));
TestTransaction tx6 = new TestTransaction(env);
tx6.set("r1", new Column("stats", "bff0"), "13");
tx6.setWeakNotification("r1", STAT_CHECK);
CommitData cd6 = tx6.createCommitData();
Assert.assertTrue(tx6.preCommit(cd6));
Stamp commitTs5 = env.getSharedResources().getOracleClient().getStamp();
Assert.assertTrue(tx5.commitPrimaryColumn(cd5, commitTs5));
Stamp commitTs6 = env.getSharedResources().getOracleClient().getStamp();
Assert.assertTrue(tx6.commitPrimaryColumn(cd6, commitTs6));
tx6.finishCommit(cd6, commitTs6);
tx5.finishCommit(cd5, commitTs5);
miniFluo.waitForObservers();
TestTransaction tx7 = new TestTransaction(env);
Assert.assertEquals("39", tx7.gets("r1", STAT_COUNT));
env.close();
}
@Test(timeout = 30000)
public void testNOOP() throws Exception {
// if an observer makes not updates in a transaction, it should still delete the weak
// notification
try (Transaction tx1 = client.newTransaction()) {
tx1.set("r1", STAT_COUNT, "3");
tx1.setWeakNotification("r1", STAT_CHECK);
tx1.commit();
}
// the following will loop forever if weak notification is not deleted
miniFluo.waitForObservers();
}
@Test(expected = IllegalArgumentException.class)
public void testBadColumn() throws Exception {
try (Transaction tx1 = client.newTransaction()) {
tx1.set("r1", STAT_COUNT, "3");
tx1.setWeakNotification("r1", new Column("stat", "foo"));
tx1.commit();
}
}
}