Skip to content

Commit

Permalink
Fixed ArrayMemcachedSessionLocator do not process connection pool pro…
Browse files Browse the repository at this point in the history
…perly
  • Loading branch information
killme2008 committed Mar 17, 2012
1 parent af67acb commit 06df5f8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import net.rubyeye.xmemcached.HashAlgorithm;
import net.rubyeye.xmemcached.networking.MemcachedSession;
Expand All @@ -30,7 +32,7 @@ public class ArrayMemcachedSessionLocator extends
AbstractMemcachedSessionLocator {

private HashAlgorithm hashAlgorighm;
private transient volatile List<Session> sessions;
private transient volatile List<List<Session>> sessions;

public ArrayMemcachedSessionLocator() {
this.hashAlgorighm = HashAlgorithm.NATIVE_HASH;
Expand All @@ -49,29 +51,40 @@ public final long getHash(int size, String key) {
return hash % size;
}

final Random rand = new Random();

public final Session getSessionByKey(final String key) {
if (this.sessions == null || this.sessions.size() == 0) {
return null;
}
// Copy on read
List<Session> sessionList = this.sessions;
List<List<Session>> sessionList = this.sessions;
int size = sessionList.size();
if (size == 0) {
return null;
}
long start = this.getHash(size, key);
Session session = sessionList.get((int) start);
List<Session> sessions = sessionList.get((int) start);
Session session = getRandomSession(sessions);

// If it is not failure mode,get next available session
if (!this.failureMode && (session == null || session.isClosed())) {
long next = this.getNext(size, start);
while ((session == null || session.isClosed()) && next != start) {
session = sessionList.get((int) next);
sessions = sessionList.get((int) next);
next = this.getNext(size, next);
session = getRandomSession(sessions);
}
}
return session;
}

private Session getRandomSession(List<Session> sessions) {
if (sessions == null || sessions.isEmpty())
return null;
return sessions.get(rand.nextInt(sessions.size()));
}

public final long getNext(int size, long start) {
if (start == size - 1) {
return 0;
Expand All @@ -81,19 +94,53 @@ public final long getNext(int size, long start) {
}

public final void updateSessions(final Collection<Session> list) {
if (list == null || list.isEmpty()) {
this.sessions = Collections.emptyList();
return;
}
Collection<Session> copySessions = list;
List<Session> newSessions = new ArrayList<Session>(
copySessions.size() * 2);
List<List<Session>> tmpList = new ArrayList<List<Session>>();
Session target = null;
List<Session> subList = null;
for (Session session : copySessions) {
if (session instanceof MemcachedTCPSession) {
int weight = ((MemcachedSession) session).getWeight();
for (int i = 0; i < weight; i++) {
newSessions.add(session);
}
if (target == null) {
target = session;
subList = new ArrayList<Session>();
subList.add(target);
} else {
newSessions.add(session);
if (session.getRemoteSocketAddress().equals(
target.getRemoteSocketAddress())) {
subList.add(session);
} else {
tmpList.add(subList);
target = session;
subList = new ArrayList<Session>();
subList.add(target);
}
}
}

// The last one
if (subList != null) {
tmpList.add(subList);
}

List<List<Session>> newSessions = new ArrayList<List<Session>>(
tmpList.size() * 2);
for (List<Session> sessions : tmpList) {
if (sessions != null && !sessions.isEmpty()) {
Session session = sessions.get(0);
if (session instanceof MemcachedTCPSession) {
int weight = ((MemcachedSession) session).getWeight();
for (int i = 0; i < weight; i++) {
newSessions.add(sessions);
}
} else {
newSessions.add(sessions);
}
}

}
this.sessions = newSessions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,41 @@ public class ArrayMemcachedSessionLocatorUnitTest extends
public void setUp() {
this.locator = new ArrayMemcachedSessionLocator();
}

@Test
public void testGetSessionByKey_SessionPool() {
MockSession session1 = new MockSession(8080);
MockSession session2 = new MockSession(8081);
MockSession session3 = new MockSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session1);
list.add(session1);
list.add(session2);
list.add(session2);
list.add(session3);

this.locator.updateSessions(list);

assertSame(session2, this.locator.getSessionByKey("a"));
assertSame(session3, this.locator.getSessionByKey("b"));
assertSame(session1, this.locator.getSessionByKey("c"));

assertSame(session2, this.locator.getSessionByKey("a"));
assertSame(session3, this.locator.getSessionByKey("b"));
assertSame(session1, this.locator.getSessionByKey("c"));

assertSame(session2, this.locator.getSessionByKey("a"));
assertSame(session3, this.locator.getSessionByKey("b"));
assertSame(session1, this.locator.getSessionByKey("c"));

}

@Test
public void testGetSessionByKey_MoreSessions() {
MockSession session1 = new MockSession(8080);
MockSession session2 = new MockSession(8080);
MockSession session3 = new MockSession(8080);
MockSession session2 = new MockSession(8081);
MockSession session3 = new MockSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
Expand All @@ -49,9 +78,9 @@ public void testGetSessionByKey_MoreSessions() {
@Test
public void testGetSessionByKey_MoreSessions_OneClosed() {
MockSession session1 = new MockSession(8080);
MockSession session2 = new MockSession(8080);
MockSession session2 = new MockSession(8081);
session2.close();
MockSession session3 = new MockSession(8080);
MockSession session3 = new MockSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
Expand All @@ -76,9 +105,9 @@ public void testGetSessionByKey_MoreSessions_OneClosed() {
public void testGetSessionByKey_MoreSessions_OneClosed_FailureMode() {
this.locator.setFailureMode(true);
MockSession session1 = new MockSession(8080);
MockSession session2 = new MockSession(8080);
MockSession session2 = new MockSession(8081);
session2.close();
MockSession session3 = new MockSession(8080);
MockSession session3 = new MockSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
Expand Down

0 comments on commit 06df5f8

Please sign in to comment.