/
CacheAffinityExample.java
138 lines (115 loc) · 5.52 KB
/
CacheAffinityExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gridgain.examples.datagrid;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import java.util.*;
/**
* This example demonstrates the simplest code that populates the distributed cache
* and co-locates simple closure execution with each key. The goal of this particular
* example is to provide the simplest code example of this logic.
* <p>
* Remote nodes should always be started with special configuration file which
* enables P2P class loading: {@code 'ggstart.{sh|bat} examples/config/example-cache.xml'}.
* <p>
* Alternatively you can run {@link CacheNodeStartup} in another JVM which will
* start GridGain node with {@code examples/config/example-cache.xml} configuration.
*/
public final class CacheAffinityExample {
/** Cache name. */
private static final String CACHE_NAME = "partitioned";
/** Number of keys. */
private static final int KEY_CNT = 20;
/**
* Executes example.
*
* @param args Command line arguments, none required.
* @throws IgniteCheckedException If example execution failed.
*/
public static void main(String[] args) throws IgniteCheckedException {
try (Ignite g = Ignition.start("examples/config/example-cache.xml")) {
System.out.println();
System.out.println(">>> Cache affinity example started.");
GridCache<Integer, String> cache = g.cache(CACHE_NAME);
// Clean up caches on all nodes before run.
cache.globalClearAll(0);
for (int i = 0; i < KEY_CNT; i++)
cache.putx(i, Integer.toString(i));
// Co-locates jobs with data using GridCompute.affinityRun(...) method.
visitUsingAffinityRun();
// Co-locates jobs with data using Grid.mapKeysToNodes(...) method.
visitUsingMapKeysToNodes();
}
}
/**
* Collocates jobs with keys they need to work on using {@link org.apache.ignite.IgniteCompute#affinityRun(String, Object, Runnable)}
* method.
*
* @throws IgniteCheckedException If failed.
*/
private static void visitUsingAffinityRun() throws IgniteCheckedException {
Ignite g = Ignition.ignite();
final GridCache<Integer, String> cache = g.cache(CACHE_NAME);
for (int i = 0; i < KEY_CNT; i++) {
final int key = i;
// This runnable will execute on the remote node where
// data with the given key is located. Since it will be co-located
// we can use local 'peek' operation safely.
g.compute().affinityRun(CACHE_NAME, key, new IgniteRunnable() {
@Override public void run() {
// Peek is a local memory lookup, however, value should never be 'null'
// as we are co-located with node that has a given key.
System.out.println("Co-located using affinityRun [key= " + key + ", value=" + cache.peek(key) + ']');
}
});
}
}
/**
* Collocates jobs with keys they need to work on using {@link org.apache.ignite.IgniteCluster#mapKeysToNodes(String, Collection)}
* method. The difference from {@code affinityRun(...)} method is that here we process multiple keys
* in a single job.
*
* @throws IgniteCheckedException If failed.
*/
private static void visitUsingMapKeysToNodes() throws IgniteCheckedException {
final Ignite g = Ignition.ignite();
Collection<Integer> keys = new ArrayList<>(KEY_CNT);
for (int i = 0; i < KEY_CNT; i++)
keys.add(i);
// Map all keys to nodes.
Map<ClusterNode, Collection<Integer>> mappings = g.cluster().mapKeysToNodes(CACHE_NAME, keys);
for (Map.Entry<ClusterNode, Collection<Integer>> mapping : mappings.entrySet()) {
ClusterNode node = mapping.getKey();
final Collection<Integer> mappedKeys = mapping.getValue();
if (node != null) {
// Bring computations to the nodes where the data resides (i.e. collocation).
g.compute(g.cluster().forNode(node)).run(new IgniteRunnable() {
@Override public void run() {
GridCache<Integer, String> cache = g.cache(CACHE_NAME);
// Peek is a local memory lookup, however, value should never be 'null'
// as we are co-located with node that has a given key.
for (Integer key : mappedKeys)
System.out.println("Co-located using mapKeysToNodes [key= " + key +
", value=" + cache.peek(key) + ']');
}
});
}
}
}
}