Skip to content

Commit

Permalink
[SEDONA-406] Raster deserializer for PySpark (#1281)
Browse files Browse the repository at this point in the history
* [SEDONA-406] Raster deserializer for PySpark (#116)

* Update documentation

* Add documentation for writing Python UDF to work with raster data
  • Loading branch information
Kontinuation committed Mar 22, 2024
1 parent b135750 commit 63a1de0
Show file tree
Hide file tree
Showing 46 changed files with 3,374 additions and 204 deletions.
17 changes: 12 additions & 5 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ on:

env:
MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=60
JAI_CORE_VERSION: "1.1.3"
JAI_CODEC_VERSION: "1.1.3"
JAI_IMAGEIO_VERSION: "1.1"

jobs:
build:
Expand Down Expand Up @@ -111,11 +114,15 @@ jobs:
- env:
SPARK_VERSION: ${{ matrix.spark }}
HADOOP_VERSION: ${{ matrix.hadoop }}
run: wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
- env:
SPARK_VERSION: ${{ matrix.spark }}
HADOOP_VERSION: ${{ matrix.hadoop }}
run: tar -xzf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
run: |
wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
wget https://repo.osgeo.org/repository/release/javax/media/jai_core/${JAI_CORE_VERSION}/jai_core-${JAI_CORE_VERSION}.jar
wget https://repo.osgeo.org/repository/release/javax/media/jai_codec/${JAI_CODEC_VERSION}/jai_codec-${JAI_CODEC_VERSION}.jar
wget https://repo.osgeo.org/repository/release/javax/media/jai_imageio/${JAI_IMAGEIO_VERSION}/jai_imageio-${JAI_IMAGEIO_VERSION}.jar
tar -xzf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
mv -v jai_core-${JAI_CORE_VERSION}.jar spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/jars/
mv -v jai_codec-${JAI_CODEC_VERSION}.jar spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/jars/
mv -v jai_imageio-${JAI_IMAGEIO_VERSION}.jar spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/jars/
- run: sudo apt-get -y install python3-pip python-dev-is-python3
- run: sudo pip3 install -U setuptools
- run: sudo pip3 install -U wheel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@
*/
package org.apache.sedona.common.raster;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.sun.media.jai.rmi.ColorModelState;
import com.sun.media.jai.util.ImageUtil;
import it.geosolutions.jaiext.range.NoDataContainer;
import org.apache.sedona.common.raster.serde.AWTRasterSerializer;
import org.apache.sedona.common.raster.serde.KryoUtil;
import org.apache.sedona.common.utils.RasterUtils;

import javax.media.jai.JAI;
Expand Down Expand Up @@ -48,7 +56,7 @@
* object is being disposed, it tries to connect to the remote server. However, there is no remote server in deep-copy
* mode, so the dispose() method throws a java.net.SocketException.
*/
public final class DeepCopiedRenderedImage implements RenderedImage, Serializable {
public final class DeepCopiedRenderedImage implements RenderedImage, Serializable, KryoSerializable {
private transient RenderedImage source;
private int minX;
private int minY;
Expand All @@ -69,7 +77,7 @@ public final class DeepCopiedRenderedImage implements RenderedImage, Serializabl
private Rectangle imageBounds;
private transient Raster imageRaster;

DeepCopiedRenderedImage() {
public DeepCopiedRenderedImage() {
this.sampleModel = null;
this.colorModel = null;
this.sources = null;
Expand All @@ -87,57 +95,54 @@ private DeepCopiedRenderedImage(RenderedImage source, boolean checkDataBuffer) {
this.properties = null;
if (source == null) {
throw new IllegalArgumentException("source cannot be null");
} else {
SampleModel sm = source.getSampleModel();
if (sm != null && SerializerFactory.getSerializer(sm.getClass()) == null) {
throw new IllegalArgumentException("sample model object is not serializable");
} else {
ColorModel cm = source.getColorModel();
if (cm != null && SerializerFactory.getSerializer(cm.getClass()) == null) {
throw new IllegalArgumentException("color model object is not serializable");
} else {
if (checkDataBuffer) {
Raster ras = source.getTile(source.getMinTileX(), source.getMinTileY());
if (ras != null) {
DataBuffer db = ras.getDataBuffer();
if (db != null && SerializerFactory.getSerializer(db.getClass()) == null) {
throw new IllegalArgumentException("data buffer object is not serializable");
}
}
}

this.source = source;
if (source instanceof RemoteImage) {
throw new IllegalArgumentException("RemoteImage is not supported");
}
this.minX = source.getMinX();
this.minY = source.getMinY();
this.width = source.getWidth();
this.height = source.getHeight();
this.minTileX = source.getMinTileX();
this.minTileY = source.getMinTileY();
this.numXTiles = source.getNumXTiles();
this.numYTiles = source.getNumYTiles();
this.tileWidth = source.getTileWidth();
this.tileHeight = source.getTileHeight();
this.tileGridXOffset = source.getTileGridXOffset();
this.tileGridYOffset = source.getTileGridYOffset();
this.sampleModel = source.getSampleModel();
this.colorModel = source.getColorModel();
this.sources = new Vector<>();
this.sources.add(source);
this.properties = new Hashtable<>();
String[] propertyNames = source.getPropertyNames();
if (propertyNames != null) {
for (String propertyName : propertyNames) {
this.properties.put(propertyName, source.getProperty(propertyName));
}
}

this.imageBounds = new Rectangle(this.minX, this.minY, this.width, this.height);
}
SampleModel sm = source.getSampleModel();
if (sm != null && SerializerFactory.getSerializer(sm.getClass()) == null) {
throw new IllegalArgumentException("sample model object is not serializable");
}
ColorModel cm = source.getColorModel();
if (cm != null && SerializerFactory.getSerializer(cm.getClass()) == null) {
throw new IllegalArgumentException("color model object is not serializable");
}
if (checkDataBuffer) {
Raster ras = source.getTile(source.getMinTileX(), source.getMinTileY());
if (ras != null) {
DataBuffer db = ras.getDataBuffer();
if (db != null && SerializerFactory.getSerializer(db.getClass()) == null) {
throw new IllegalArgumentException("data buffer object is not serializable");
}
}
}

this.source = source;
if (source instanceof RemoteImage) {
throw new IllegalArgumentException("RemoteImage is not supported");
}
this.minX = source.getMinX();
this.minY = source.getMinY();
this.width = source.getWidth();
this.height = source.getHeight();
this.minTileX = source.getMinTileX();
this.minTileY = source.getMinTileY();
this.numXTiles = source.getNumXTiles();
this.numYTiles = source.getNumYTiles();
this.tileWidth = source.getTileWidth();
this.tileHeight = source.getTileHeight();
this.tileGridXOffset = source.getTileGridXOffset();
this.tileGridYOffset = source.getTileGridYOffset();
this.sampleModel = source.getSampleModel();
this.colorModel = source.getColorModel();
this.sources = new Vector<>();
this.sources.add(source);
this.properties = new Hashtable<>();
String[] propertyNames = source.getPropertyNames();
if (propertyNames != null) {
for (String propertyName : propertyNames) {
this.properties.put(propertyName, source.getProperty(propertyName));
}
}

this.imageBounds = new Rectangle(this.minX, this.minY, this.width, this.height);
}

@Override
Expand Down Expand Up @@ -325,10 +330,54 @@ public int getWidth() {
return this.width;
}

@SuppressWarnings("unchecked")
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();

Hashtable<String, Object> propertyTable = getSerializableProperties();
out.writeObject(SerializerFactory.getState(this.colorModel, null));
out.writeObject(propertyTable);
if (this.source != null) {
Raster serializedRaster = RasterUtils.getRaster(this.source);
out.writeObject(SerializerFactory.getState(serializedRaster, null));
} else {
out.writeObject(SerializerFactory.getState(imageRaster, null));
}
}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.source = null;
in.defaultReadObject();

SerializableState cmState = (SerializableState)in.readObject();
this.colorModel = (ColorModel)cmState.getObject();
this.properties = (Hashtable<String, Object>)in.readObject();
for (String key : this.properties.keySet()) {
Object value = this.properties.get(key);
// Restore the value of GC_NODATA property as a NoDataContainer object.
if (value instanceof SingleValueNoDataContainer) {
SingleValueNoDataContainer noDataContainer = (SingleValueNoDataContainer) value;
this.properties.put(key, new NoDataContainer(noDataContainer.singleValue));
}
}
SerializableState rasState = (SerializableState)in.readObject();
this.imageRaster = (Raster)rasState.getObject();

// The deserialized rendered image contains only one tile (imageRaster). We need to update
// the sample model and tile properties to reflect this.
this.sampleModel = this.imageRaster.getSampleModel();
this.tileWidth = this.width;
this.tileHeight = this.height;
this.numXTiles = 1;
this.numYTiles = 1;
this.minTileX = 0;
this.minTileY = 0;
this.tileGridXOffset = minX;
this.tileGridYOffset = minY;
}

@SuppressWarnings("unchecked")
private Hashtable<String, Object> getSerializableProperties() {
// Prepare serialize properties. non-serializable properties won't be serialized.
Hashtable<String, Object> propertyTable = this.properties;
boolean propertiesCloned = false;
Expand All @@ -350,25 +399,54 @@ private void writeObject(ObjectOutputStream out) throws IOException {
}
}
}
return propertyTable;
}

out.writeObject(SerializerFactory.getState(this.colorModel, null));
out.writeObject(propertyTable);
public static void registerKryo(Kryo kryo) {
kryo.register(ColorModelState.class, new JavaSerializer());
}

private static final AWTRasterSerializer awtRasterSerializer = new AWTRasterSerializer();

@Override
public void write(Kryo kryo, Output output) {
// write basic properties
output.writeInt(minX);
output.writeInt(minY);
output.writeInt(width);
output.writeInt(height);

// write properties
Hashtable<String, Object> propertyTable = getSerializableProperties();
KryoUtil.writeObjectWithLength(kryo, output, propertyTable);

// write color model
SerializableState colorModelState = SerializerFactory.getState(this.colorModel, null);
KryoUtil.writeObjectWithLength(kryo, output, colorModelState);

// write raster
Raster serializedRaster;
if (this.source != null) {
Raster serializedRaster = RasterUtils.getRaster(this.source);
out.writeObject(SerializerFactory.getState(serializedRaster, null));
serializedRaster = RasterUtils.getRaster(this.source);
} else {
out.writeObject(SerializerFactory.getState(imageRaster, null));
serializedRaster = imageRaster;
}
awtRasterSerializer.write(kryo, output, serializedRaster);
}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.source = null;
in.defaultReadObject();

SerializableState cmState = (SerializableState)in.readObject();
this.colorModel = (ColorModel)cmState.getObject();
this.properties = (Hashtable<String, Object>)in.readObject();
@Override
public void read(Kryo kryo, Input input) {
// read basic properties
minX = input.readInt();
minY = input.readInt();
width = input.readInt();
height = input.readInt();
imageBounds = new Rectangle(minX, minY, width, height);

// read properties
input.readInt(); // skip the length of the property table
properties = kryo.readObject(input, Hashtable.class);
for (String key : this.properties.keySet()) {
Object value = this.properties.get(key);
// Restore the value of GC_NODATA property as a NoDataContainer object.
Expand All @@ -377,8 +455,14 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
this.properties.put(key, new NoDataContainer(noDataContainer.singleValue));
}
}
SerializableState rasState = (SerializableState)in.readObject();
this.imageRaster = (Raster)rasState.getObject();

// read color model
input.readInt(); // skip the length of the color model state
ColorModelState cmState = kryo.readObject(input, ColorModelState.class);
this.colorModel = (ColorModel) cmState.getObject();

// read raster
this.imageRaster = awtRasterSerializer.read(kryo, input, Raster.class);

// The deserialized rendered image contains only one tile (imageRaster). We need to update
// the sample model and tile properties to reflect this.
Expand All @@ -387,6 +471,10 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
this.tileHeight = this.height;
this.numXTiles = 1;
this.numYTiles = 1;
this.minTileX = 0;
this.minTileY = 0;
this.tileGridXOffset = minX;
this.tileGridYOffset = minY;
}

/**
Expand Down

0 comments on commit 63a1de0

Please sign in to comment.