Permalink
Browse files

Old mapping impl made me barf. This one's better; also added another …

…test.
  • Loading branch information...
1 parent ec97bb7 commit 87f331360a316fa94f6d2a8906ccbbdc8f8b2b5f U-osiris\joeo committed Sep 29, 2011
@@ -1,7 +1,18 @@
package javax.datagrid.mapreduce;
+import javax.datagrid.DataGrid;
import java.io.Serializable;
-public interface Mapper<ResultType extends Serializable> {
- <T extends Serializable> ResultType execute(T t);
+public abstract class Mapper<ResultType extends Serializable> {
+ private DataGrid localGrid;
+
+ public DataGrid getLocalGrid() {
+ return localGrid;
+ }
+
+ public void setLocalGrid(DataGrid localGrid) {
+ this.localGrid = localGrid;
+ }
+
+ public abstract ResultType execute();
}
@@ -15,7 +15,7 @@
import java.util.concurrent.Future;
public class TransientDataGrid extends AbstractDataGrid {
- Map<Serializable, DataGridEntry> data = new ConcurrentHashMap<Serializable, DataGridEntry>();
+ Map<Serializable, DataGridEntry<Serializable>> data = new ConcurrentHashMap<Serializable, DataGridEntry<Serializable>>();
/**
* This field indicates how many entries can exist in the map before each access triggers
* a scan for expired entries. Thus, if the "data grid" has more than this number of entries,
@@ -28,31 +28,27 @@
private int ceilingBeforeScan = 1000;
private long defaultExpiry = DataGrid.DEFAULT_EXPIRY;
- @SuppressWarnings({"UnusedDeclaration"})
public int getCeilingBeforeScan() {
return ceilingBeforeScan;
}
- @SuppressWarnings({"UnusedDeclaration"})
public void setCeilingBeforeScan(int ceilingBeforeScan) {
this.ceilingBeforeScan = ceilingBeforeScan;
}
- @SuppressWarnings({"UnusedDeclaration"})
public long getDefaultExpiry() {
return defaultExpiry;
}
- @SuppressWarnings({"UnusedDeclaration"})
public void setDefaultExpiry(long defaultExpiry) {
this.defaultExpiry = defaultExpiry;
}
private void scanForExpiredEntries() {
long now = System.currentTimeMillis();
- Iterator<Map.Entry<Serializable, DataGridEntry>> iterator = data.entrySet().iterator();
+ Iterator<Map.Entry<Serializable, DataGridEntry<Serializable>>> iterator = data.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<Serializable, DataGridEntry> entry = iterator.next();
+ Map.Entry<Serializable, DataGridEntry<Serializable>> entry = iterator.next();
if (entry.getValue().getExpirationTime() < now) {
iterator.remove();
}
@@ -68,7 +64,7 @@ private void scanForExpiredEntries() {
if (expiry != DataGrid.FOREVER) {
expirationTime = System.currentTimeMillis() + expiry;
}
- DataGridEntry dataGridEntry = new DataGridEntry<V>(value, expirationTime);
+ DataGridEntry<Serializable> dataGridEntry = new DataGridEntry<Serializable>(value, expirationTime);
data.put(key, dataGridEntry);
return new TransientDataGridFuture<V>(value);
}
@@ -78,13 +74,13 @@ private void scanForExpiredEntries() {
if (data.size() > ceilingBeforeScan) {
scanForExpiredEntries();
}
- @SuppressWarnings({"unchecked"})
- DataGridEntry<V> dataGridEntry = data.get(key);
+
+ DataGridEntry<Serializable> dataGridEntry = data.get(key);
if (dataGridEntry != null && dataGridEntry.isExpired()) {
data.remove(key);
dataGridEntry = null;
}
- V payload = (dataGridEntry == null ? null : dataGridEntry.getPayload());
+ V payload = (dataGridEntry == null ? null : (V) dataGridEntry.getPayload());
return new TransientDataGridFuture<V>(payload);
}
@@ -94,15 +90,14 @@ private void scanForExpiredEntries() {
scanForExpiredEntries();
}
@SuppressWarnings({"unchecked"})
- DataGridEntry<V> dataGridEntry = data.get(key);
+ DataGridEntry<Serializable> dataGridEntry = data.get(key);
if (dataGridEntry != null) {
data.remove(key);
if (dataGridEntry.isExpired()) {
dataGridEntry = null;
}
}
-
- V payload = (dataGridEntry == null ? null : dataGridEntry.getPayload());
+ V payload = (dataGridEntry == null ? null : (V) dataGridEntry.getPayload());
return new TransientDataGridFuture<V>(payload);
}
@@ -115,15 +110,13 @@ private void scanForExpiredEntries() {
public <ResultType extends Serializable> ResultType map(Mapper<ResultType> mapper, Reducer<ResultType> reducer,
Filter filter) {
List<ResultType> results = new ArrayList<ResultType>();
- for (Serializable key : data.keySet()) {
- Serializable value = read(key);
- // make sure it's live data
- if (value != null) {
- if (filter.acceptable(key, value)) {
- results.add(mapper.execute(value));
- }
- }
- }
+
+ // in a real implementation, mapper.execute() would run for each node in the data grid.
+ // for the transient grid, however, there's only one node to work with.
+ mapper.setLocalGrid(this);
+
+ results.add(mapper.execute());
+
return reducer.reduce(results);
}
}
@@ -3,14 +3,19 @@
import org.testng.annotations.Test;
import javax.datagrid.DataGrid;
+import javax.datagrid.mapreduce.Mapper;
+import javax.datagrid.mapreduce.Reducer;
+import java.util.Collection;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class TransientDataGridTest {
@Test
public void instantiateDataGrid() {
DataGrid dataGrid = new TransientDataGrid();
+ assertNotNull(dataGrid);
}
@Test
@@ -46,7 +51,36 @@ public void testExpirations() {
sleep(110);
result = dataGrid.take("key");
assertNull(result);
+ }
+ @Test(expectedExceptions = ClassCastException.class)
+ public void testCastingProblems() {
+ DataGrid dataGrid = new TransientDataGrid();
+ String result = dataGrid.write("key", "value");
+ Integer r=dataGrid.read("key");
+ }
+
+ @Test
+ public void testMapReduce() {
+ DataGrid dataGrid = new TransientDataGrid();
+ Integer result = dataGrid.map(new Mapper<Integer>() {
+ @Override
+ public Integer execute() {
+ return 1;
+ }
+ },
+ new Reducer<Integer>() {
+ @Override
+ public Integer reduce(Collection<Integer> mappedResults) {
+ int sum = 0;
+ for (Integer i : mappedResults) {
+ sum += i;
+ }
+ return sum;
+ }
+ }
+ );
+ assertEquals(result.intValue(), 1);
}
private void sleep(long time) {

0 comments on commit 87f3313

Please sign in to comment.