Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block repository writes if repo lock is lost #1204

Merged
merged 42 commits into from Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1ce2d2e
OAK-10464 testcontainers dependency
Oct 2, 2023
fa4a114
OAK-10464 testcontainers dependency
Oct 2, 2023
13ed631
OAK-10464 - merge with trunk
Oct 2, 2023
48a392b
modified blob endpoint, logback, add testcontainer dependency
t-rana Oct 6, 2023
1f5b852
migrate mongo docker rule to testcontainer
t-rana Oct 9, 2023
f09fc4c
Merge remote-tracking branch 'upstream/trunk' into issue/OAK-10464
t-rana Oct 9, 2023
af3ec3c
minor dependency changes
t-rana Oct 10, 2023
3a90933
minor changes
t-rana Oct 10, 2023
970cbfe
refactor dependency, removed unused ones
t-rana Oct 11, 2023
7614372
Merge remote-tracking branch 'upstream/trunk' into issue/OAK-10464
t-rana Oct 16, 2023
914d57b
Merge remote-tracking branch 'upstream/trunk' into issue/OAK-10464
t-rana Oct 17, 2023
9ab85a6
revert logback changes
t-rana Oct 17, 2023
755a976
revert logback changes
t-rana Oct 17, 2023
57b3511
Merge remote-tracking branch 'upstream/trunk' into issue/OAK-10464
t-rana Oct 20, 2023
352bb34
removed test container dependency from child modules
t-rana Oct 21, 2023
15d5cec
Merge remote-tracking branch 'upstream/trunk' into issue/OAK-10464
t-rana Oct 25, 2023
246885a
Merge remote-tracking branch 'upstream/trunk' into issue/OAK-10464
t-rana Oct 27, 2023
5d322e7
OAK-10006 writes not possible during lease renewal
Nov 6, 2023
97317f3
OAK-10006 Merge with 'trunk'
Nov 8, 2023
b889d21
OAK-10543 added license header and increased versions for exported pa…
Nov 8, 2023
7e186ca
OAK-10543 added license header and increased versions for exported pa…
Nov 8, 2023
3196223
OAK-10543 remove try/catch
Nov 9, 2023
a276998
OAK-10543 remove duplicated testcontainers dependency
Nov 9, 2023
d2cea09
OAK-10543 remove null initialisation
Nov 9, 2023
66bde1c
OAK-10006 added test for WriteAccessController and fixed tests in oak…
Nov 10, 2023
ea80fc8
OAK-10006 deleted unused constructor
Nov 10, 2023
7d07ff4
OAK-10006 modified constructor
Nov 10, 2023
e098e5b
OAK-10006 modified constructor
Nov 10, 2023
eacc662
OAK-10006 import statement
Nov 10, 2023
e96cfcb
OAK-10006 import statements
Nov 13, 2023
6467d7c
OAK-10006 renew lease more often and do not block writes unnecessarily
Nov 14, 2023
39778e3
OAK-10006 change sys property name
Nov 14, 2023
6ce1890
OAK-10006 remove extra line
Nov 14, 2023
8e59fd9
OAK-10006 check values of system properties
Nov 20, 2023
b6edff5
OAK-10006 check values of system properties
Nov 20, 2023
524f95d
OAK-10006 indivate units for system properties
Nov 20, 2023
0d5997c
OAK-10006 use rule to set system properties
Nov 20, 2023
f3c298b
OAK-10006 use rule to set system properties
Nov 20, 2023
eccd4e1
OAK-10006 use separate object for lock
Nov 20, 2023
d0aeefa
OAK-10006 set system properties in test
Nov 20, 2023
939c88e
OAK-10006 add sys properties to log output
Nov 20, 2023
490ba60
OAK-10006 testWritesBlockedOnlyAfterFewUnsuccessfulAttempts
Nov 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions oak-parent/pom.xml
Expand Up @@ -783,6 +783,12 @@
<version>1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
10 changes: 10 additions & 0 deletions oak-segment-azure/pom.xml
Expand Up @@ -110,6 +110,11 @@
<artifactId>org.osgi.service.metatype.annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.annotation.versioning</artifactId>
<scope>provided</scope>
</dependency>

<!-- Nullability annotations -->
<dependency>
Expand Down Expand Up @@ -260,6 +265,11 @@
<artifactId>org.osgi.service.cm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Expand Up @@ -22,6 +22,7 @@
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;
import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
import org.apache.jackrabbit.oak.segment.remote.RemoteUtilities;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
Expand Down Expand Up @@ -61,11 +62,13 @@ public class AzureArchiveManager implements SegmentArchiveManager {
protected final IOMonitor ioMonitor;

protected final FileStoreMonitor monitor;
private WriteAccessController writeAccessController;

public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
this.cloudBlobDirectory = cloudBlobDirectory;
public AzureArchiveManager(CloudBlobDirectory segmentstoreDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, WriteAccessController writeAccessController) {
this.cloudBlobDirectory = segmentstoreDirectory;
this.ioMonitor = ioMonitor;
this.monitor = fileStoreMonitor;
this.writeAccessController = writeAccessController;
}

@Override
Expand Down Expand Up @@ -127,7 +130,7 @@ public SegmentArchiveReader forceOpen(String archiveName) throws IOException {

@Override
public SegmentArchiveWriter create(String archiveName) throws IOException {
return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor);
return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor, writeAccessController);
}

@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.jackrabbit.oak.segment.azure.util.CaseInsensitiveKeysMapAccess;
import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
Expand Down Expand Up @@ -53,14 +54,17 @@ public class AzureJournalFile implements JournalFile {

private final int lineLimit;

AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) {
private final WriteAccessController writeAccessController;

AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, WriteAccessController writeAccessController, int lineLimit) {
this.directory = directory;
this.journalNamePrefix = journalNamePrefix;
this.lineLimit = lineLimit;
this.writeAccessController = writeAccessController;
}

public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) {
this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT);
public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, WriteAccessController writeAccessController) {
this(directory, journalNamePrefix, writeAccessController, JOURNAL_LINE_LIMIT);
}

@Override
Expand Down Expand Up @@ -183,6 +187,8 @@ public AzureJournalWriter() throws IOException {
@Override
public void truncate() throws IOException {
try {
writeAccessController.checkWritingAllowed();

for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) {
cloudAppendBlob.delete();
}
Expand All @@ -200,6 +206,8 @@ public void writeLine(String line) throws IOException {

@Override
public void batchWriteLines(List<String> lines) throws IOException {
writeAccessController.checkWritingAllowed();

if (lines.isEmpty()) {
return;
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
Expand Down Expand Up @@ -64,6 +65,8 @@ public class AzurePersistence implements SegmentNodeStorePersistence {

protected final CloudBlobDirectory segmentstoreDirectory;

protected WriteAccessController writeAccessController = new WriteAccessController();

public AzurePersistence(CloudBlobDirectory segmentStoreDirectory) {
this.segmentstoreDirectory = segmentStoreDirectory;

Expand Down Expand Up @@ -92,7 +95,7 @@ public AzurePersistence(CloudBlobDirectory segmentStoreDirectory) {
@Override
public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
attachRemoteStoreMonitor(remoteStoreMonitor);
return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor);
return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor, writeAccessController);
}

@Override
Expand All @@ -116,7 +119,7 @@ public boolean segmentFilesExist() {

@Override
public JournalFile getJournalFile() {
return new AzureJournalFile(segmentstoreDirectory, "journal.log");
return new AzureJournalFile(segmentstoreDirectory, "journal.log", writeAccessController);
}

@Override
Expand All @@ -134,7 +137,7 @@ public RepositoryLock lockRepository() throws IOException {
return new AzureRepositoryLock(getBlockBlob("repo.lock"), () -> {
log.warn("Lost connection to the Azure. The client will be closed.");
// TODO close the connection
}).lock();
}, writeAccessController).lock();
}

private CloudBlockBlob getBlockBlob(String path) throws IOException {
Expand Down Expand Up @@ -178,8 +181,11 @@ public void eventOccurred(RequestCompletedEvent e) {
});
}

public CloudBlobDirectory getSegmentstoreDirectory() {
return segmentstoreDirectory;
}
public CloudBlobDirectory getSegmentstoreDirectory() {
return segmentstoreDirectory;
}

public void setWriteAccessController(WriteAccessController writeAccessController) {
this.writeAccessController = writeAccessController;
}
}
Expand Up @@ -17,9 +17,12 @@
package org.apache.jackrabbit.oak.segment.azure;

import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.RetryNoRetry;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,8 +37,16 @@ public class AzureRepositoryLock implements RepositoryLock {
private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class);

private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.azure.lock.timeout", 0);
private static final Integer LEASE_RENEWAL_TIMEOUT_MS = 5000;

private static int INTERVAL = 60;
public static final String LEASE_DURATION_PROP = "oak.segment.azure.lock.leaseDurationInSec";
private final int leaseDuration = Integer.getInteger(LEASE_DURATION_PROP, 60);

public static final String RENEWAL_INTERVAL_PROP = "oak.segment.azure.lock.leaseRenewalIntervalInSec";
private final int renewalInterval = Integer.getInteger(RENEWAL_INTERVAL_PROP, 5);

public static final String TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP = "oak.segment.azure.lock.blockWritesAfterInSec";
private final int timeToWaitBeforeWriteBlock = Integer.getInteger(TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, 20);

private final Runnable shutdownHook;

Expand All @@ -45,19 +56,27 @@ public class AzureRepositoryLock implements RepositoryLock {

private final int timeoutSec;

private WriteAccessController writeAccessController;

private String leaseId;

private volatile boolean doUpdate;

public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) {
this(blob, shutdownHook, TIMEOUT_SEC);
public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, WriteAccessController writeAccessController) {
this(blob, shutdownHook, writeAccessController, TIMEOUT_SEC);
}

public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, int timeoutSec) {
public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, WriteAccessController writeAccessController, int timeoutSec) {
this.shutdownHook = shutdownHook;
this.blob = blob;
this.executor = Executors.newSingleThreadExecutor();
this.timeoutSec = timeoutSec;
this.writeAccessController = writeAccessController;

if (leaseDuration < timeToWaitBeforeWriteBlock || timeToWaitBeforeWriteBlock < renewalInterval) {
throw new IllegalStateException(String.format("The value of %s must be greater than %s and the value of %s must be greater than %s",
LEASE_DURATION_PROP, TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, RENEWAL_INTERVAL_PROP));
}
}

public AzureRepositoryLock lock() throws IOException {
Expand All @@ -66,7 +85,13 @@ public AzureRepositoryLock lock() throws IOException {
do {
try {
blob.openOutputStream().close();
leaseId = blob.acquireLease(INTERVAL, null);

log.info("{} = {}", LEASE_DURATION_PROP, leaseDuration);
log.info("{} = {}", RENEWAL_INTERVAL_PROP, renewalInterval);
log.info("{} = {}", TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, timeToWaitBeforeWriteBlock);

leaseId = blob.acquireLease(leaseDuration, null);
writeAccessController.enableWriting();
log.info("Acquired lease {}", leaseId);
} catch (StorageException | IOException e) {
if (ex == null) {
Expand Down Expand Up @@ -97,14 +122,25 @@ private void refreshLease() {
doUpdate = true;
long lastUpdate = 0;
while (doUpdate) {
long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
try {
long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
if (timeSinceLastUpdate > INTERVAL / 2) {
blob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
if (timeSinceLastUpdate > renewalInterval) {

BlobRequestOptions requestOptions = new BlobRequestOptions();
requestOptions.setMaximumExecutionTimeInMs(LEASE_RENEWAL_TIMEOUT_MS);
requestOptions.setRetryPolicyFactory(new RetryNoRetry());
blob.renewLease(AccessCondition.generateLeaseCondition(leaseId), requestOptions, null);

writeAccessController.enableWriting();
lastUpdate = System.currentTimeMillis();
}
} catch (StorageException e) {
timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;

if (e.getErrorCode().equals(StorageErrorCodeStrings.OPERATION_TIMED_OUT)) {
if (timeSinceLastUpdate > timeToWaitBeforeWriteBlock) {
writeAccessController.disableWriting();
}
log.warn("Could not renew the lease due to the operation timeout. Retry in progress ...", e);
} else {
log.error("Can't renew the lease", e);
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.microsoft.azure.storage.blob.CloudBlockBlob;

import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.azure.util.Retrier;
import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter;
import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
Expand All @@ -46,9 +47,10 @@ public class AzureSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWrite
Integer.getInteger("azure.segment.archive.writer.retries.intervalMs", 5000)
);

public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor, WriteAccessController writeAccessController) {
super(ioMonitor, monitor);
this.archiveDirectory = archiveDirectory;
this.writeAccessController = writeAccessController;
}

@Override
Expand All @@ -58,6 +60,9 @@ public String getName() {

@Override
protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {

writeAccessController.checkWritingAllowed();

long msb = indexEntry.getMsb();
long lsb = indexEntry.getLsb();
String segmentName = getSegmentFileName(indexEntry);
Expand Down Expand Up @@ -90,6 +95,8 @@ protected Buffer doReadArchiveEntry(RemoteSegmentArchiveEntry indexEntry) throw
protected void doWriteDataFile(byte[] data, String extension) throws IOException {
retrier.execute(() -> {
try {
writeAccessController.checkWritingAllowed();

getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
} catch (StorageException e) {
throw new IOException(e);
Expand All @@ -101,6 +108,8 @@ protected void doWriteDataFile(byte[] data, String extension) throws IOException
protected void afterQueueClosed() throws IOException {
retrier.execute(() -> {
try {
writeAccessController.checkWritingAllowed();

getBlob("closed").uploadFromByteArray(new byte[0], 0, 0);
} catch (StorageException e) {
throw new IOException(e);
Expand Down
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@Internal(since = "1.0.0")
@Version("2.0.0")
package org.apache.jackrabbit.oak.segment.azure;

import org.apache.jackrabbit.oak.commons.annotations.Internal;
import org.osgi.annotation.versioning.Version;
Expand Up @@ -58,6 +58,7 @@
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
Expand Down Expand Up @@ -92,6 +93,8 @@ public void testSegmentCopy() throws Exception {
SegmentNodeStorePersistence srcPersistence = getSrcPersistence();
SegmentNodeStorePersistence destPersistence = getDestPersistence();

RepositoryLock destRepositoryLock = destPersistence.lockRepository();

String srcPathOrUri = getSrcPathOrUri();
String destPathOrUri = getDestPathOrUri();

Expand All @@ -111,6 +114,8 @@ public void testSegmentCopy() throws Exception {
checkJournal(srcPersistence, destPersistence);
checkGCJournal(srcPersistence, destPersistence);
checkManifest(srcPersistence, destPersistence);

destRepositoryLock.unlock();
}

private int runSegmentCopy(SegmentNodeStorePersistence srcPersistence, SegmentNodeStorePersistence destPersistence,
Expand Down