Skip to content

Commit

Permalink
Some hollow4 cleanup to take advantage of breaking changes (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
akhaku authored and PaulSandoz committed Nov 12, 2018
1 parent 2d0d050 commit 7590c84
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 198 deletions.
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -106,6 +107,7 @@
*
* </dl>
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public class HollowConsumer {
private static final Logger LOG = Logger.getLogger(HollowConsumer.class.getName());

Expand Down Expand Up @@ -832,8 +834,7 @@ public B withRefreshListener(HollowConsumer.RefreshListener refreshListener) {
}

public B withRefreshListeners(HollowConsumer.RefreshListener... refreshListeners) {
for (HollowConsumer.RefreshListener refreshListener : refreshListeners)
this.refreshListeners.add(refreshListener);
Collections.addAll(this.refreshListeners, refreshListeners);
return (B)this;
}

Expand Down Expand Up @@ -897,7 +898,7 @@ protected void checkArguments() {

BlobRetriever blobRetriever = this.blobRetriever;
if (localBlobStoreDir != null)
this.blobRetriever = new HollowFilesystemBlobRetriever(localBlobStoreDir, blobRetriever);
this.blobRetriever = new HollowFilesystemBlobRetriever(localBlobStoreDir.toPath(), blobRetriever);


if (refreshExecutor == null)
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.producer.fs.HollowFilesystemAnnouncer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.lang.ref.WeakReference;
Expand All @@ -34,7 +33,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -43,7 +41,6 @@ public class HollowFilesystemAnnouncementWatcher implements HollowConsumer.Annou

private static final Logger log = Logger.getLogger(HollowFilesystemAnnouncementWatcher.class.getName());

private final Path publishPath;
private final Path announcePath;

private final List<HollowConsumer> subscribedConsumers;
Expand All @@ -53,52 +50,34 @@ public class HollowFilesystemAnnouncementWatcher implements HollowConsumer.Annou

private long latestVersion;

// TODO: deprecate in Hollow 3.0.0
// @Deprecated
public HollowFilesystemAnnouncementWatcher(File publishDir) {
this(publishDir.toPath());
}

/**
* Creates a file system announcement watcher.
*
* @param publishPath the publish path
* @since 2.12.0
*/
@SuppressWarnings("unused")
public HollowFilesystemAnnouncementWatcher(Path publishPath) {
this(publishPath,
newScheduledThreadPool(
1 /*corePoolSize*/,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
})
);

this(publishPath, newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}));
ownedExecutor = true;
}

@Deprecated
public HollowFilesystemAnnouncementWatcher(File publishDir, ScheduledExecutorService executor) {
this(publishDir.toPath(), executor);
}

/**
* Creates a file system announcement watcher.
*
* @param publishPath the publish path
* @param executor the executor from which watching is executed
* @since 2.12.0
*/
@SuppressWarnings("WeakerAccess")
public HollowFilesystemAnnouncementWatcher(Path publishPath, ScheduledExecutorService executor) {
this.publishPath = publishPath;
this.executor = executor;

this.announcePath = this.publishPath.resolve(HollowFilesystemAnnouncer.ANNOUNCEMENT_FILENAME);
this.announcePath = publishPath.resolve(HollowFilesystemAnnouncer.ANNOUNCEMENT_FILENAME);
this.subscribedConsumers = new CopyOnWriteArrayList<>();
this.latestVersion = readLatestVersion();

Expand Down
Expand Up @@ -33,26 +33,16 @@ public class HollowFilesystemBlobRetriever implements HollowConsumer.BlobRetriev
private final HollowConsumer.BlobRetriever fallbackBlobRetriever;
private final Path blobStorePath;

// TODO: deprecate in Hollow 3.0.0
// @Deprecated
public HollowFilesystemBlobRetriever(File blobStoreDir) {
this(blobStoreDir.toPath());
}

/**
* A new HollowFilesystemBlobRetriever which is not backed by a remote store.
*
* @param blobStorePath The directory from which to retrieve blobs
* @since 2.12.0
*/
@SuppressWarnings("unused")
public HollowFilesystemBlobRetriever(Path blobStorePath) {
this(blobStorePath, null);
}

@Deprecated
public HollowFilesystemBlobRetriever(File blobStoreDir, HollowConsumer.BlobRetriever fallbackBlobRetriever) {
this(blobStoreDir.toPath(), fallbackBlobRetriever);
}

/**
* A new HollowFileSystemBlobRetriever which is backed by a remote store. When a blob from the remote store
Expand Down Expand Up @@ -166,26 +156,22 @@ private static class FilesystemBlob extends HollowConsumer.Blob {
private final Path path;

@Deprecated
public FilesystemBlob(File snapshotFile, long toVersion) {
FilesystemBlob(File snapshotFile, long toVersion) {
this(snapshotFile.toPath(), toVersion);
}

/**
* @since 2.12.0
*/
public FilesystemBlob(Path snapshotPath, long toVersion) {
FilesystemBlob(Path snapshotPath, long toVersion) {
super(toVersion);
this.path = snapshotPath;
}

public FilesystemBlob(File deltaFile, long fromVersion, long toVersion) {
this(deltaFile.toPath(), fromVersion, toVersion);
}

/**
* @since 2.12.0
*/
public FilesystemBlob(Path deltaPath, long fromVersion, long toVersion) {
FilesystemBlob(Path deltaPath, long fromVersion, long toVersion) {
super(fromVersion, toVersion);
this.path = deltaPath;
}
Expand All @@ -202,12 +188,7 @@ private static class BlobForBackupToFilesystem extends HollowConsumer.Blob {
private final HollowConsumer.Blob remoteBlob;
private final Path path;

@Deprecated
public BlobForBackupToFilesystem(HollowConsumer.Blob remoteBlob, File destinationFile) {
this(remoteBlob, destinationFile.toPath());
}

public BlobForBackupToFilesystem(HollowConsumer.Blob remoteBlob, Path destinationPath) {
BlobForBackupToFilesystem(HollowConsumer.Blob remoteBlob, Path destinationPath) {
super(remoteBlob.getFromVersion(), remoteBlob.getToVersion());
this.path = destinationPath;
this.remoteBlob = remoteBlob;
Expand All @@ -221,7 +202,7 @@ public InputStream getInputStream() throws IOException {
OutputStream os = Files.newOutputStream(path)
) {
byte buf[] = new byte[4096];
int n = 0;
int n;
while (-1 != (n = is.read(buf)))
os.write(buf, 0, n);
}
Expand Down
Expand Up @@ -20,12 +20,12 @@
import java.util.concurrent.TimeUnit;

/**
* A trivial implementation of {@link HollowProducerListenerV2} which does nothing.
* A trivial implementation of {@link HollowProducerListener} which does nothing.
* Implementations of HollowProducerListenerV2 should subclass this class for convenience.
*
* @author Tim Taylor {@literal<tim@toolbear.io>}
*/
public class AbstractHollowProducerListener implements HollowProducerListenerV2 {
public class AbstractHollowProducerListener implements HollowProducerListener {
// DataModelInitializationListener
@Override public void onProducerInit(long elapsed, TimeUnit unit) {}

Expand Down

This file was deleted.

Expand Up @@ -18,7 +18,6 @@
package com.netflix.hollow.api.producer.fs;

import com.netflix.hollow.api.producer.HollowProducer;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -29,12 +28,6 @@ public class HollowFilesystemAnnouncer implements HollowProducer.Announcer {

private final Path publishPath;

// TODO: deprecate in Hollow 3.0.0
// @Deprecated
public HollowFilesystemAnnouncer(File publishDir) {
this(publishDir.toPath());
}

/**
* @param publishPath the path to publish to
* @since 2.12.0
Expand All @@ -48,7 +41,7 @@ public void announce(long stateVersion) {
Path announcePath = publishPath.resolve(ANNOUNCEMENT_FILENAME);
try {
Files.write(announcePath, String.valueOf(stateVersion).getBytes());
} catch(IOException ex) {
} catch (IOException ex) {
throw new RuntimeException("Unable to write to announcement file; path=" + announcePath, ex);
}
}
Expand Down
Expand Up @@ -18,7 +18,6 @@
package com.netflix.hollow.api.producer.fs;

import com.netflix.hollow.api.producer.HollowProducer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -29,20 +28,14 @@ public class HollowFilesystemPublisher implements HollowProducer.Publisher {

private final Path blobStorePath;

// TODO: deprecate in Hollow 3.0.0
// @Deprecated
public HollowFilesystemPublisher(File blobStoreDir) {
this(blobStoreDir.toPath());
}

/**
* @param blobStorePath the path to store blobs
* @since 2.12.0
*/
public HollowFilesystemPublisher(Path blobStorePath) {
this.blobStorePath = blobStorePath;
try {
if(!Files.exists(this.blobStorePath)){
if (!Files.exists(this.blobStorePath)){
Files.createDirectories(this.blobStorePath);
}
} catch (IOException e) {
Expand All @@ -66,10 +59,10 @@ public void publish(HollowProducer.Blob blob) {

try(
InputStream is = blob.newInputStream();
OutputStream os = Files.newOutputStream(destination);
OutputStream os = Files.newOutputStream(destination)
) {
byte buf[] = new byte[4096];
int n = 0;
int n;
while (-1 != (n = is.read(buf)))
os.write(buf, 0, n);
} catch(IOException e) {
Expand Down

0 comments on commit 7590c84

Please sign in to comment.