Skip to content

Commit

Permalink
Update spim_data and bigdataviewer-core dependencies to pull in impro…
Browse files Browse the repository at this point in the history
…vements made for running in Spark environments. Setup and share single SpimData2 instance for all tasks running on the same executor JVM. This fixes #4 and also fixes #5.
  • Loading branch information
trautmane committed Jan 12, 2022
1 parent 258dcb1 commit 0110317
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 38 deletions.
15 changes: 7 additions & 8 deletions pom.xml
Expand Up @@ -3,7 +3,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>BigStitcher-Spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<version>0.0.2-SNAPSHOT</version>

<parent>
<groupId>org.scijava</groupId>
Expand Down Expand Up @@ -77,6 +77,8 @@

<properties>
<scijava.jvm.version>1.8</scijava.jvm.version>
<bigdataviewer-core.version>10.3.0</bigdataviewer-core.version>
<spim_data.version>2.2.7</spim_data.version>
<license.licenseName>gpl_v2</license.licenseName>
<license.copyrightOwners>Developers.</license.copyrightOwners>
</properties>
Expand All @@ -92,10 +94,13 @@
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>sc.fiji</groupId>
<artifactId>bigdataviewer-core</artifactId>
</dependency>
<dependency>
<groupId>sc.fiji</groupId>
<artifactId>spim_data</artifactId>
<version>2.2.7</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -190,12 +195,6 @@
<groupId>net.preibisch</groupId>
<artifactId>BigStitcher</artifactId>
<version>0.8.3</version>
<exclusions>
<exclusion>
<groupId>sc.fiji</groupId>
<artifactId>spim_data</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>info.picocli</groupId>
Expand Down
16 changes: 5 additions & 11 deletions src/main/java/net/preibisch/bigstitcher/spark/AffineFusion.java
Expand Up @@ -5,6 +5,8 @@
import java.util.Arrays;
import java.util.concurrent.Callable;

import mpicbg.spim.data.sequence.ViewId;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -14,8 +16,6 @@
import org.janelia.saalfeldlab.n5.N5Writer;
import org.janelia.saalfeldlab.n5.imglib2.N5Utils;

import mpicbg.spim.data.sequence.ImgLoader;
import mpicbg.spim.data.sequence.ViewId;
import net.imglib2.FinalInterval;
import net.imglib2.Interval;
import net.imglib2.RandomAccessibleInterval;
Expand All @@ -31,7 +31,6 @@
import net.preibisch.bigstitcher.spark.util.Spark;
import net.preibisch.bigstitcher.spark.util.ViewUtil;
import net.preibisch.mvrecon.fiji.spimdata.SpimData2;
import net.preibisch.mvrecon.fiji.spimdata.XmlIoSpimData2;
import net.preibisch.mvrecon.fiji.spimdata.boundingbox.BoundingBox;
import net.preibisch.mvrecon.process.fusion.FusionTools;
import picocli.CommandLine;
Expand Down Expand Up @@ -91,8 +90,7 @@ public Void call() throws Exception
if ( !Import.testInputParamters(uint8, uint16, minIntensity, maxIntensity, vi, angleIds, channelIds, illuminationIds, tileIds, timepointIds) )
System.exit( 1 );

final XmlIoSpimData2 io = new XmlIoSpimData2( "" );
final SpimData2 data = io.load( xmlPath );
final SpimData2 data = Spark.getSparkJobSpimData2("", xmlPath);

// select views to process
final ArrayList< ViewId > viewIds =
Expand Down Expand Up @@ -205,7 +203,7 @@ else if ( uint16 )

rdd.foreach(
gridBlock -> {
final SpimData2 dataLocal = new XmlIoSpimData2( "" ).load( xmlPath );
final SpimData2 dataLocal = Spark.getSparkJobSpimData2("", xmlPath);

// be smarter, test which ViewIds are actually needed for the block we want to fuse
final Interval fusedBlock =
Expand All @@ -218,16 +216,12 @@ else if ( uint16 )
// recover views to process
final ArrayList< ViewId > viewIdsLocal = new ArrayList<>();

// Create N5ImageLoader outside of loop to reduce total number of created fetcher threads.
// TODO: parameterize N5ImageLoader fetcher thread count to allow override in Spark environments
final ImgLoader imgLoader = dataLocal.getSequenceDescription().getImgLoader();

for ( int i = 0; i < serializedViewIds.length; ++i )
{
final ViewId viewId = Spark.deserializeViewIds(serializedViewIds, i);

// expand to be conservative ...
final Interval boundingBox = ViewUtil.getTransformedBoundingBox( dataLocal, viewId, imgLoader );
final Interval boundingBox = ViewUtil.getTransformedBoundingBox( dataLocal, viewId );
final Interval bounds = Intervals.expand( boundingBox, 2 );

if ( ViewUtil.overlaps( fusedBlock, bounds ) )
Expand Down
Expand Up @@ -6,7 +6,8 @@
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import mpicbg.spim.data.sequence.ViewId;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -17,8 +18,6 @@
import org.janelia.saalfeldlab.n5.N5Writer;
import org.janelia.saalfeldlab.n5.imglib2.N5Utils;

import mpicbg.spim.data.sequence.ImgLoader;
import mpicbg.spim.data.sequence.ViewId;
import net.imglib2.FinalInterval;
import net.imglib2.Interval;
import net.imglib2.RandomAccessibleInterval;
Expand All @@ -34,7 +33,6 @@
import net.preibisch.bigstitcher.spark.util.Spark;
import net.preibisch.bigstitcher.spark.util.ViewUtil;
import net.preibisch.mvrecon.fiji.spimdata.SpimData2;
import net.preibisch.mvrecon.fiji.spimdata.XmlIoSpimData2;
import net.preibisch.mvrecon.fiji.spimdata.boundingbox.BoundingBox;
import net.preibisch.mvrecon.process.fusion.transformed.nonrigid.NonRigidTools;
import picocli.CommandLine;
Expand Down Expand Up @@ -100,8 +98,7 @@ public Void call() throws Exception
if ( !Import.testInputParamters(uint8, uint16, minIntensity, maxIntensity, vi, angleIds, channelIds, illuminationIds, tileIds, timepointIds) )
System.exit( 1 );

final XmlIoSpimData2 io = new XmlIoSpimData2( "" );
final SpimData2 data = io.load( xmlPath );
final SpimData2 data = Spark.getSparkJobSpimData2("", xmlPath);

// select views to process
final ArrayList< ViewId > viewIds =
Expand Down Expand Up @@ -219,7 +216,7 @@ else if ( uint16 )

rdd.foreach(
gridBlock -> {
final SpimData2 dataLocal = new XmlIoSpimData2( "" ).load( xmlPath );
final SpimData2 dataLocal = Spark.getSparkJobSpimData2("", xmlPath);

// be smarter, test which ViewIds are actually needed for the block we want to fuse
final Interval fusedBlock =
Expand All @@ -233,16 +230,12 @@ else if ( uint16 )
final List< ViewId > viewsToFuse = new ArrayList<>(); // fuse
final List< ViewId > allViews = new ArrayList<>();

// Create N5ImageLoader outside of loop to reduce total number of created fetcher threads.
// TODO: parameterize N5ImageLoader fetcher thread count to allow override in Spark environments
final ImgLoader imgLoader = dataLocal.getSequenceDescription().getImgLoader();

for ( int i = 0; i < serializedViewIds.length; ++i )
{
final ViewId viewId = Spark.deserializeViewIds(serializedViewIds, i);

// expand by 50 to be conservative for non-rigid overlaps
final Interval boundingBox = ViewUtil.getTransformedBoundingBox( dataLocal, viewId, imgLoader );
final Interval boundingBox = ViewUtil.getTransformedBoundingBox( dataLocal, viewId );
final Interval bounds = Intervals.expand( boundingBox, 50 );

if ( ViewUtil.overlaps( fusedBlock, bounds ) )
Expand All @@ -261,12 +254,12 @@ else if ( uint16 )

for ( final ViewId viewId : allViews )
{
final Interval boundingBoxView = ViewUtil.getTransformedBoundingBox( dataLocal, viewId, imgLoader );
final Interval boundingBoxView = ViewUtil.getTransformedBoundingBox( dataLocal, viewId );
final Interval boundsView = Intervals.expand( boundingBoxView, 25 );

for ( final ViewId fusedId : viewsToFuse )
{
final Interval boundingBoxFused = ViewUtil.getTransformedBoundingBox( dataLocal, fusedId, imgLoader );
final Interval boundingBoxFused = ViewUtil.getTransformedBoundingBox( dataLocal, fusedId );
final Interval boundsFused = Intervals.expand( boundingBoxFused, 25 );

if ( ViewUtil.overlaps( boundsView, boundsFused ))
Expand Down
80 changes: 80 additions & 0 deletions src/main/java/net/preibisch/bigstitcher/spark/util/Spark.java
Expand Up @@ -2,8 +2,18 @@

import java.util.List;

import mpicbg.spim.data.SpimDataException;
import mpicbg.spim.data.generic.sequence.BasicImgLoader;
import mpicbg.spim.data.sequence.ViewId;

import org.apache.spark.SparkEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import bdv.ViewerImgLoader;
import net.preibisch.mvrecon.fiji.spimdata.SpimData2;
import net.preibisch.mvrecon.fiji.spimdata.XmlIoSpimData2;

public class Spark {

public static ViewId deserializeViewIds( final int[][] serializedViewIds, final int i )
Expand All @@ -24,4 +34,74 @@ public static int[][] serializeViewIds( final List< ViewId > viewIds )
return serializedViewIds;
}

public static String getSparkExecutorId() {
final SparkEnv sparkEnv = SparkEnv.get();
return sparkEnv == null ? null : sparkEnv.executorId();
}

/**
* @return the single common data instance for the current spark job (JVM).
* This instance is optimized for use within single-threaded Spark tasks.
*/
public static SpimData2 getSparkJobSpimData2(final String clusterExt,
final String xmlPath)
throws SpimDataException {


final SpimData2 data;
if (sparkJobSpimData2 == null) {
data = loadSpimData2(clusterExt, xmlPath);
} else {
validateSpimData2Location(clusterExt, xmlPath);
data = sparkJobSpimData2;
}

LOG.info("getSparkJobSpimData2: returning {} for clusterExt={}, xmlPath={} on executorId={}",
data, clusterExt, xmlPath, getSparkExecutorId());

return data;
}

private static synchronized SpimData2 loadSpimData2(final String clusterExt,
final String xmlPath)
throws SpimDataException {

if (sparkJobSpimData2 == null) {

final SpimData2 data = new XmlIoSpimData2(clusterExt).load(xmlPath);
// set number of fetcher threads to 0 for spark usage
final BasicImgLoader imgLoader = data.getSequenceDescription().getImgLoader();
if (imgLoader instanceof ViewerImgLoader) {
((ViewerImgLoader) imgLoader).setNumFetcherThreads(0);
}

sparkJobSpimData2ClusterExt = clusterExt;
sparkJobSpimData2XmlPath = xmlPath;
sparkJobSpimData2 = data;

LOG.info("loadSpimData2: loaded {} for clusterExt={}, xmlPath={} on executorId={}",
sparkJobSpimData2, clusterExt, xmlPath, getSparkExecutorId());

} else {
validateSpimData2Location(clusterExt, xmlPath);
}

return sparkJobSpimData2;
}

private static void validateSpimData2Location(final String clusterExt,
final String xmlPath) throws SpimDataException {
if (! (clusterExt.equals(sparkJobSpimData2ClusterExt) && xmlPath.equals(sparkJobSpimData2XmlPath))) {
throw new SpimDataException("attempted to load data with clusterExt " + clusterExt + " and xmlPath " +
xmlPath + " after data was already loaded with clusterExt " +
sparkJobSpimData2ClusterExt + " and xmlPath " + sparkJobSpimData2XmlPath);
}
}

private static String sparkJobSpimData2ClusterExt;
private static String sparkJobSpimData2XmlPath;
private static SpimData2 sparkJobSpimData2;

private static final Logger LOG = LoggerFactory.getLogger(Spark.class);

}
Expand Up @@ -5,6 +5,7 @@
import mpicbg.spim.data.sequence.ImgLoader;
import mpicbg.spim.data.sequence.SetupImgLoader;
import mpicbg.spim.data.sequence.ViewId;

import net.imglib2.Dimensions;
import net.imglib2.FinalInterval;
import net.imglib2.Interval;
Expand All @@ -24,17 +25,15 @@ public static boolean overlaps( final Interval interval1, final Interval interva
}

public static Interval getTransformedBoundingBox( final SpimData data,
final ViewId viewId,
final ImgLoader imgLoader )
final ViewId viewId )
{
final ImgLoader imgLoader = data.getSequenceDescription().getImgLoader();
final SetupImgLoader<?> setupImgLoader = imgLoader.getSetupImgLoader(viewId.getViewSetupId());
final Dimensions dim = setupImgLoader.getImageSize(viewId.getTimePointId() );

final ViewRegistration reg = data.getViewRegistrations().getViewRegistration( viewId );
reg.updateModel();

final Interval bounds = Intervals.largestContainedInterval( reg.getModel().estimateBounds( new FinalInterval( dim ) ) );

return bounds;
return Intervals.largestContainedInterval( reg.getModel().estimateBounds( new FinalInterval( dim ) ) );
}
}

0 comments on commit 0110317

Please sign in to comment.