Skip to content

Commit

Permalink
OAK-2407: Auto-refresh sessions on revision gc
Browse files Browse the repository at this point in the history
Register GCMonitor to detect compaction and auto-refresh

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1662323 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
mduerig committed Feb 25, 2015
1 parent 83a9e86 commit 199a3fe
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 28 deletions.
1 change: 1 addition & 0 deletions oak-core/pom.xml
Expand Up @@ -85,6 +85,7 @@
org.apache.jackrabbit.oak.plugins.value,
org.apache.jackrabbit.oak.plugins.version,
org.apache.jackrabbit.oak.spi.commit,
org.apache.jackrabbit.oak.spi.gc,
org.apache.jackrabbit.oak.spi.lifecycle,
org.apache.jackrabbit.oak.spi.query,
org.apache.jackrabbit.oak.spi.security,
Expand Down
Expand Up @@ -83,4 +83,13 @@ public interface GCMonitor {
* @param currentSize number of bytes after garbage collection
*/
void cleaned(long reclaimedSize, long currentSize);

class Empty implements GCMonitor {
@Override public void info(String message, Object[] arguments) { }
@Override public void warn(String message, Object[] arguments) { }
@Override public void error(String message, Exception e) { }
@Override public void skipped(String reason, Object[] arguments) { }
@Override public void compacted() { }
@Override public void cleaned(long reclaimedSize, long currentSize) { }
}
}
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

@Version("1.0")
@Export(optional = "provide:=true")
package org.apache.jackrabbit.oak.spi.gc;

import aQute.bnd.annotation.Export;
import aQute.bnd.annotation.Version;
Expand Up @@ -22,7 +22,6 @@
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;

import java.io.Closeable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -44,7 +43,6 @@
import javax.security.auth.login.LoginException;

import com.google.common.collect.ImmutableMap;

import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.api.JackrabbitRepository;
import org.apache.jackrabbit.api.security.authentication.token.TokenCredentials;
Expand All @@ -54,9 +52,11 @@
import org.apache.jackrabbit.oak.api.jmx.SessionMBean;
import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy;
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Composite;
import org.apache.jackrabbit.oak.jcr.session.SessionContext;
import org.apache.jackrabbit.oak.jcr.session.SessionStats;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
Expand Down Expand Up @@ -244,7 +244,7 @@ public Session login(@CheckForNull Credentials credentials, @CheckForNull String
@CheckForNull Map<String, Object> attributes) throws RepositoryException {
try {
if (attributes == null) {
attributes = Collections.emptyMap();
attributes = emptyMap();
}
Long refreshInterval = getRefreshInterval(credentials);
if (refreshInterval == null) {
Expand All @@ -254,7 +254,9 @@ public Session login(@CheckForNull Credentials credentials, @CheckForNull String
}
boolean relaxedLocking = getRelaxedLocking(attributes);

RefreshStrategy refreshStrategy = createRefreshStrategy(refreshInterval);
RefreshStrategy refreshStrategy = refreshInterval == null
? new RefreshStrategy.LogOnce(60)
: new RefreshStrategy.Timed(refreshInterval);
ContentSession contentSession = contentRepository.login(credentials, workspaceName);
SessionDelegate sessionDelegate = createSessionDelegate(refreshStrategy, contentSession);
SessionContext context = createSessionContext(
Expand All @@ -268,8 +270,12 @@ public Session login(@CheckForNull Credentials credentials, @CheckForNull String
}

private SessionDelegate createSessionDelegate(
final RefreshStrategy refreshStrategy,
final ContentSession contentSession) {
RefreshStrategy refreshStrategy,
ContentSession contentSession) {

final RefreshOnGC refreshOnGC = new RefreshOnGC();
refreshStrategy = new Composite(refreshStrategy, refreshOnGC);

return new SessionDelegate(
contentSession, securityProvider, refreshStrategy,
threadSaveCount, statisticManager, clock) {
Expand All @@ -280,6 +286,7 @@ private SessionDelegate createSessionDelegate(

@Override
public void logout() {
refreshOnGC.close();
// Cancel session MBean registration
registrationTask.cancel();
scheduledTask.cancel(false);
Expand Down Expand Up @@ -444,31 +451,35 @@ private static Map<String, Object> createAttributes(
}
}

/**
* Auto refresh logic for sessions, which is done to enhance backwards compatibility with
* Jackrabbit 2.
* <p>
* A sessions is automatically refreshed when
* <ul>
* <li>it has not been accessed for the number of seconds specified by the
* {@code refreshInterval} parameter,</li>
* <li>an observation event has been delivered to a listener registered from within this
* session,</li>
* <li>an updated occurred through a different session from <em>within the same
* thread.</em></li>
* </ul>
* In addition a warning is logged once per session if the session is accessed after one
* minute of inactivity.
*/
private RefreshStrategy createRefreshStrategy(Long refreshInterval) {
if (refreshInterval == null) {
return new RefreshStrategy.LogOnce(60);
} else {
return new RefreshStrategy.Timed(refreshInterval);
private class RefreshOnGC implements RefreshStrategy {
private final GCMonitor gcMonitor = new GCMonitor.Empty() {
@Override
public void compacted() {
compactionCount++;
}
};

private final Registration reg = whiteboard.register(GCMonitor.class, gcMonitor, emptyMap());

private volatile int compactionCount;
private int refreshCount;

public void close() {
reg.unregister();
}

@Override
public boolean needsRefresh(long secondsSinceLastAccess) {
if (compactionCount > refreshCount) {
refreshCount = compactionCount;
return true;
} else {
return false;
}
}
}

static class RegistrationTask implements Runnable {
private static class RegistrationTask implements Runnable {
private final SessionStats sessionStats;
private final Whiteboard whiteboard;
private boolean cancelled;
Expand Down
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.jackrabbit.oak.jcr;

import static java.io.File.createTempFile;
import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_NONE;
import static org.apache.jackrabbit.oak.plugins.segment.file.FileStore.newFileStore;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

import javax.annotation.Nonnull;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;

import org.apache.jackrabbit.api.JackrabbitRepository;
import org.apache.jackrabbit.oak.Oak;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.DefaultWhiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class RefreshOnGCTest {
private FileStore fileStore;
private Repository repository;

@Before
public void setup() throws IOException {
File directory = createTempFile(getClass().getSimpleName(), "test", new File("target"));
directory.delete();
directory.mkdir();

Whiteboard whiteboard = new DefaultWhiteboard();
fileStore = newFileStore(directory)
.withWhiteBoard(whiteboard)
.create()
.setCompactionStrategy(new CompactionStrategy(
false, false, CLEAN_NONE, 0, CompactionStrategy.MEMORY_THRESHOLD_DEFAULT) {
@Override
public boolean compacted(@Nonnull Callable<Boolean> setHead) throws Exception {
setHead.call();
return true;
}
});

NodeStore nodeStore = new SegmentNodeStore(fileStore);
Oak oak = new Oak(nodeStore);
oak.with(whiteboard);
repository = new Jcr(oak).createRepository();
}

@After
public void tearDown() {
if (repository instanceof JackrabbitRepository) {
((JackrabbitRepository) repository).shutdown();
}
}

@Test
public void compactionCausesRefresh() throws RepositoryException, InterruptedException, ExecutionException {
Session session = repository.login(new SimpleCredentials("admin", "admin".toCharArray()));
try {
Node root = session.getRootNode();
root.addNode("one");
session.save();

addNode(repository, "two");

fileStore.compact();
assertTrue(root.hasNode("one"));
assertTrue("Node two must be visible as compaction should cause the session to refresh",
root.hasNode("two"));
} finally {
session.logout();
}
}

private static void addNode(final Repository repository, final String name)
throws ExecutionException, InterruptedException {
// Execute on different threads to ensure same thread session
// refreshing doesn't come into our way
run(new Callable<Void>() {
@Override
public Void call() throws Exception {
Session session = repository.login(new SimpleCredentials("admin", "admin".toCharArray()));
try {
Node root = session.getRootNode();
root.addNode(name);
session.save();
} finally {
session.logout();
}
return null;
}
});
}

private static void run(Callable<Void> callable) throws InterruptedException, ExecutionException {
FutureTask<Void> task = new FutureTask<Void>(callable);
new Thread(task).start();
task.get();
}


}

0 comments on commit 199a3fe

Please sign in to comment.