Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-406] Raster deserializer for PySpark #1281

Merged
merged 3 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ on:

env:
MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=60
JAI_CORE_VERSION: "1.1.3"
JAI_CODEC_VERSION: "1.1.3"
JAI_IMAGEIO_VERSION: "1.1"

jobs:
build:
Expand Down Expand Up @@ -111,11 +114,15 @@ jobs:
- env:
SPARK_VERSION: ${{ matrix.spark }}
HADOOP_VERSION: ${{ matrix.hadoop }}
run: wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
- env:
SPARK_VERSION: ${{ matrix.spark }}
HADOOP_VERSION: ${{ matrix.hadoop }}
run: tar -xzf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
run: |
wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
wget https://repo.osgeo.org/repository/release/javax/media/jai_core/${JAI_CORE_VERSION}/jai_core-${JAI_CORE_VERSION}.jar
wget https://repo.osgeo.org/repository/release/javax/media/jai_codec/${JAI_CODEC_VERSION}/jai_codec-${JAI_CODEC_VERSION}.jar
wget https://repo.osgeo.org/repository/release/javax/media/jai_imageio/${JAI_IMAGEIO_VERSION}/jai_imageio-${JAI_IMAGEIO_VERSION}.jar
tar -xzf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
mv -v jai_core-${JAI_CORE_VERSION}.jar spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/jars/
mv -v jai_codec-${JAI_CODEC_VERSION}.jar spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/jars/
mv -v jai_imageio-${JAI_IMAGEIO_VERSION}.jar spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/jars/
- run: sudo apt-get -y install python3-pip python-dev-is-python3
- run: sudo pip3 install -U setuptools
- run: sudo pip3 install -U wheel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@
*/
package org.apache.sedona.common.raster;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.sun.media.jai.rmi.ColorModelState;
import com.sun.media.jai.util.ImageUtil;
import it.geosolutions.jaiext.range.NoDataContainer;
import org.apache.sedona.common.raster.serde.AWTRasterSerializer;
import org.apache.sedona.common.raster.serde.KryoUtil;
import org.apache.sedona.common.utils.RasterUtils;

import javax.media.jai.JAI;
Expand Down Expand Up @@ -48,7 +56,7 @@
* object is being disposed, it tries to connect to the remote server. However, there is no remote server in deep-copy
* mode, so the dispose() method throws a java.net.SocketException.
*/
public final class DeepCopiedRenderedImage implements RenderedImage, Serializable {
public final class DeepCopiedRenderedImage implements RenderedImage, Serializable, KryoSerializable {
private transient RenderedImage source;
private int minX;
private int minY;
Expand All @@ -69,7 +77,7 @@ public final class DeepCopiedRenderedImage implements RenderedImage, Serializabl
private Rectangle imageBounds;
private transient Raster imageRaster;

DeepCopiedRenderedImage() {
public DeepCopiedRenderedImage() {
this.sampleModel = null;
this.colorModel = null;
this.sources = null;
Expand All @@ -87,57 +95,54 @@ private DeepCopiedRenderedImage(RenderedImage source, boolean checkDataBuffer) {
this.properties = null;
if (source == null) {
throw new IllegalArgumentException("source cannot be null");
} else {
SampleModel sm = source.getSampleModel();
if (sm != null && SerializerFactory.getSerializer(sm.getClass()) == null) {
throw new IllegalArgumentException("sample model object is not serializable");
} else {
ColorModel cm = source.getColorModel();
if (cm != null && SerializerFactory.getSerializer(cm.getClass()) == null) {
throw new IllegalArgumentException("color model object is not serializable");
} else {
if (checkDataBuffer) {
Raster ras = source.getTile(source.getMinTileX(), source.getMinTileY());
if (ras != null) {
DataBuffer db = ras.getDataBuffer();
if (db != null && SerializerFactory.getSerializer(db.getClass()) == null) {
throw new IllegalArgumentException("data buffer object is not serializable");
}
}
}

this.source = source;
if (source instanceof RemoteImage) {
throw new IllegalArgumentException("RemoteImage is not supported");
}
this.minX = source.getMinX();
this.minY = source.getMinY();
this.width = source.getWidth();
this.height = source.getHeight();
this.minTileX = source.getMinTileX();
this.minTileY = source.getMinTileY();
this.numXTiles = source.getNumXTiles();
this.numYTiles = source.getNumYTiles();
this.tileWidth = source.getTileWidth();
this.tileHeight = source.getTileHeight();
this.tileGridXOffset = source.getTileGridXOffset();
this.tileGridYOffset = source.getTileGridYOffset();
this.sampleModel = source.getSampleModel();
this.colorModel = source.getColorModel();
this.sources = new Vector<>();
this.sources.add(source);
this.properties = new Hashtable<>();
String[] propertyNames = source.getPropertyNames();
if (propertyNames != null) {
for (String propertyName : propertyNames) {
this.properties.put(propertyName, source.getProperty(propertyName));
}
}

this.imageBounds = new Rectangle(this.minX, this.minY, this.width, this.height);
}
SampleModel sm = source.getSampleModel();
if (sm != null && SerializerFactory.getSerializer(sm.getClass()) == null) {
throw new IllegalArgumentException("sample model object is not serializable");
}
ColorModel cm = source.getColorModel();
if (cm != null && SerializerFactory.getSerializer(cm.getClass()) == null) {
throw new IllegalArgumentException("color model object is not serializable");
}
if (checkDataBuffer) {
Raster ras = source.getTile(source.getMinTileX(), source.getMinTileY());
if (ras != null) {
DataBuffer db = ras.getDataBuffer();
if (db != null && SerializerFactory.getSerializer(db.getClass()) == null) {
throw new IllegalArgumentException("data buffer object is not serializable");
}
}
}

this.source = source;
if (source instanceof RemoteImage) {
throw new IllegalArgumentException("RemoteImage is not supported");
}
this.minX = source.getMinX();
this.minY = source.getMinY();
this.width = source.getWidth();
this.height = source.getHeight();
this.minTileX = source.getMinTileX();
this.minTileY = source.getMinTileY();
this.numXTiles = source.getNumXTiles();
this.numYTiles = source.getNumYTiles();
this.tileWidth = source.getTileWidth();
this.tileHeight = source.getTileHeight();
this.tileGridXOffset = source.getTileGridXOffset();
this.tileGridYOffset = source.getTileGridYOffset();
this.sampleModel = source.getSampleModel();
this.colorModel = source.getColorModel();
this.sources = new Vector<>();
this.sources.add(source);
this.properties = new Hashtable<>();
String[] propertyNames = source.getPropertyNames();
if (propertyNames != null) {
for (String propertyName : propertyNames) {
this.properties.put(propertyName, source.getProperty(propertyName));
}
}

this.imageBounds = new Rectangle(this.minX, this.minY, this.width, this.height);
}

@Override
Expand Down Expand Up @@ -325,10 +330,54 @@ public int getWidth() {
return this.width;
}

@SuppressWarnings("unchecked")
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();

Hashtable<String, Object> propertyTable = getSerializableProperties();
out.writeObject(SerializerFactory.getState(this.colorModel, null));
out.writeObject(propertyTable);
if (this.source != null) {
Raster serializedRaster = RasterUtils.getRaster(this.source);
out.writeObject(SerializerFactory.getState(serializedRaster, null));
} else {
out.writeObject(SerializerFactory.getState(imageRaster, null));
}
}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.source = null;
in.defaultReadObject();

SerializableState cmState = (SerializableState)in.readObject();
this.colorModel = (ColorModel)cmState.getObject();
this.properties = (Hashtable<String, Object>)in.readObject();
for (String key : this.properties.keySet()) {
Object value = this.properties.get(key);
// Restore the value of GC_NODATA property as a NoDataContainer object.
if (value instanceof SingleValueNoDataContainer) {
SingleValueNoDataContainer noDataContainer = (SingleValueNoDataContainer) value;
this.properties.put(key, new NoDataContainer(noDataContainer.singleValue));
}
}
SerializableState rasState = (SerializableState)in.readObject();
this.imageRaster = (Raster)rasState.getObject();

// The deserialized rendered image contains only one tile (imageRaster). We need to update
// the sample model and tile properties to reflect this.
this.sampleModel = this.imageRaster.getSampleModel();
this.tileWidth = this.width;
this.tileHeight = this.height;
this.numXTiles = 1;
this.numYTiles = 1;
this.minTileX = 0;
this.minTileY = 0;
this.tileGridXOffset = minX;
this.tileGridYOffset = minY;
}

@SuppressWarnings("unchecked")
private Hashtable<String, Object> getSerializableProperties() {
// Prepare serialize properties. non-serializable properties won't be serialized.
Hashtable<String, Object> propertyTable = this.properties;
boolean propertiesCloned = false;
Expand All @@ -350,25 +399,54 @@ private void writeObject(ObjectOutputStream out) throws IOException {
}
}
}
return propertyTable;
}

out.writeObject(SerializerFactory.getState(this.colorModel, null));
out.writeObject(propertyTable);
public static void registerKryo(Kryo kryo) {
kryo.register(ColorModelState.class, new JavaSerializer());
}

private static final AWTRasterSerializer awtRasterSerializer = new AWTRasterSerializer();

@Override
public void write(Kryo kryo, Output output) {
// write basic properties
output.writeInt(minX);
output.writeInt(minY);
output.writeInt(width);
output.writeInt(height);

// write properties
Hashtable<String, Object> propertyTable = getSerializableProperties();
KryoUtil.writeObjectWithLength(kryo, output, propertyTable);

// write color model
SerializableState colorModelState = SerializerFactory.getState(this.colorModel, null);
KryoUtil.writeObjectWithLength(kryo, output, colorModelState);

// write raster
Raster serializedRaster;
if (this.source != null) {
Raster serializedRaster = RasterUtils.getRaster(this.source);
out.writeObject(SerializerFactory.getState(serializedRaster, null));
serializedRaster = RasterUtils.getRaster(this.source);
} else {
out.writeObject(SerializerFactory.getState(imageRaster, null));
serializedRaster = imageRaster;
}
awtRasterSerializer.write(kryo, output, serializedRaster);
}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.source = null;
in.defaultReadObject();

SerializableState cmState = (SerializableState)in.readObject();
this.colorModel = (ColorModel)cmState.getObject();
this.properties = (Hashtable<String, Object>)in.readObject();
@Override
public void read(Kryo kryo, Input input) {
// read basic properties
minX = input.readInt();
minY = input.readInt();
width = input.readInt();
height = input.readInt();
imageBounds = new Rectangle(minX, minY, width, height);

// read properties
input.readInt(); // skip the length of the property table
properties = kryo.readObject(input, Hashtable.class);
for (String key : this.properties.keySet()) {
Object value = this.properties.get(key);
// Restore the value of GC_NODATA property as a NoDataContainer object.
Expand All @@ -377,8 +455,14 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
this.properties.put(key, new NoDataContainer(noDataContainer.singleValue));
}
}
SerializableState rasState = (SerializableState)in.readObject();
this.imageRaster = (Raster)rasState.getObject();

// read color model
input.readInt(); // skip the length of the color model state
ColorModelState cmState = kryo.readObject(input, ColorModelState.class);
this.colorModel = (ColorModel) cmState.getObject();

// read raster
this.imageRaster = awtRasterSerializer.read(kryo, input, Raster.class);

// The deserialized rendered image contains only one tile (imageRaster). We need to update
// the sample model and tile properties to reflect this.
Expand All @@ -387,6 +471,10 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
this.tileHeight = this.height;
this.numXTiles = 1;
this.numYTiles = 1;
this.minTileX = 0;
this.minTileY = 0;
this.tileGridXOffset = minX;
this.tileGridYOffset = minY;
}

/**
Expand Down
Loading
Loading