Skip to content

Commit

Permalink
Implement Batch Jobs using Google Drive
Browse files Browse the repository at this point in the history
  • Loading branch information
m-mohr committed Jul 30, 2024
1 parent 7fcfd38 commit 2a16251
Show file tree
Hide file tree
Showing 17 changed files with 424 additions and 79 deletions.
3 changes: 1 addition & 2 deletions src/api/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ export default class FilesAPI {
size: newFileStat.size,
modified: Utils.getISODateTime(newFileStat.mtime)
});
}
catch (e) {
} catch (e) {
if (this.context.debug) {
console.error(e);
}
Expand Down
5 changes: 4 additions & 1 deletion src/api/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ export default class JobsAPI {
if (!req.user._id) {
throw new Errors.AuthenticationRequired();
}

// Update the task status
this.context.processingContext(req.user).startTaskMonitor();
}

async getJobs(req, res) {
Expand Down Expand Up @@ -246,7 +249,7 @@ export default class JobsAPI {
if (this.storage.isFieldEditable(key)) {
switch(key) {
case 'process': {
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user));
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user, job));
pg.allowUndefinedParameters(false);
promises.push(pg.validate());
break;
Expand Down
2 changes: 1 addition & 1 deletion src/api/services.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export default class ServicesAPI {
if (this.storage.isFieldEditable(key)) {
switch(key) {
case 'process': {
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user));
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user, service));
// ToDo 1.0: Correctly handle service paramaters #79

Check warning on line 122 in src/api/services.js

View workflow job for this annotation

GitHub Actions / deploy (17)

Unexpected 'todo' comment: 'ToDo 1.0: Correctly handle service...'

Check warning on line 122 in src/api/services.js

View workflow job for this annotation

GitHub Actions / deploy (lts/*)

Unexpected 'todo' comment: 'ToDo 1.0: Correctly handle service...'

Check warning on line 122 in src/api/services.js

View workflow job for this annotation

GitHub Actions / deploy (latest)

Unexpected 'todo' comment: 'ToDo 1.0: Correctly handle service...'
pg.allowUndefinedParameters(false);
promises.push(pg.validate());
Expand Down
66 changes: 50 additions & 16 deletions src/api/worker/batchjob.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,69 @@ export default async function run(config, storage, user, query) {
logger.info("Starting batch job");
await storage.updateJobStatus(query, 'running');

const context = config.processingContext(user);
const context = config.processingContext(user, job);
const pg = new ProcessGraph(job.process, context, logger);
await pg.execute();

const computeTasks = pg.getResults().map(async (datacube) => {
const response = await GeeResults.retrieve(context, datacube, logger);
const params = datacube.getOutputFormatParameters();
const filename = (params.name || String(Utils.generateHash())) + GeeResults.getFileExtension(datacube, config);
const filepath = storage.getJobFile(job._id, filename);
logger.debug("Storing result to: " + filepath);
await fse.ensureDir(path.dirname(filepath));
await new Promise((resolve, reject) => {
const writer = fse.createWriteStream(filepath);
response.data.pipe(writer);
writer.on('error', reject);
writer.on('close', resolve);
});
return { filepath, datacube };
const computeTasks = pg.getResults().map(async (dc) => {
const format = config.getOutputFormat(dc.getOutputFormat());
const datacube = format.preprocess(GeeResults.BATCH, context, dc, logger);

if (format.canExport()) {
const tasks = await format.export(context.ee, dc, context.getResource());
storage.addTasks(job, tasks);
context.startTaskMonitor();
const jobfolder = storage.getJobFolder(job._id);
await fse.ensureDir(path.dirname(jobfolder));
const filepath = await new Promise((resolve, reject) => {
setInterval(async () => {
const updatedJob = await storage.getById(job._id, job.user_id);
if (!updatedJob) {
reject(new Error("Job was deleted"));
}
if (['canceled', 'error', 'finished'].includes(updatedJob.status)) {
// todo: resolve google drive URLs
resolve([]); //job.googleDriveResults
}
}, 10000);
});
return { filepath, datacube };
}
else {
const response = await format.retrieve(context.ee, dc);
const params = datacube.getOutputFormatParameters();
const filename = (params.name || String(Utils.generateHash())) + GeeResults.getFileExtension(datacube, config);
const filepath = storage.getJobFile(job._id, filename);
await fse.ensureDir(path.dirname(filepath));
await new Promise((resolve, reject) => {
const writer = fse.createWriteStream(filepath);
response.data.pipe(writer);
writer.on('error', reject);
writer.on('close', resolve);
});
return { filepath, datacube };
}
});

await Promise.all(computeTasks);

const results = [];
for (const task of computeTasks) {
results.push(await task);
const { filepath, datacube } = await task;
if (Array.isArray(filepath)) {
filepath.forEach(fp => results.push({ filepath: fp, datacube }));
}
else {
results.push({ filepath, datacube });
}
}

const item = await createSTAC(storage, job, results);
const stacpath = storage.getJobFile(job._id, 'stac.json');
await fse.writeJSON(stacpath, item, {spaces: 2});

logger.info("Finished");
// todo: set to error is any task failed
storage.updateJobStatus(query, 'finished');
} catch(e) {
logger.error(e);
Expand All @@ -78,6 +109,9 @@ async function createSTAC(storage, job, results) {
let endTime = null;
const extents = [];
for(const { filepath, datacube } of results) {
if (!filepath) {
continue;
}
const filename = path.basename(filepath);
const stat = await fse.stat(filepath);
let asset = {
Expand Down
6 changes: 5 additions & 1 deletion src/api/worker/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ export default async function run(config, user, id, process, log_level) {
if (pg.getResults().length > 1) {
logger.warn("Multiple results can't be processed in synchronous mode. Only the result from the result node will be returned.");
}
return await GeeResults.retrieve(context, resultNode.getResult(), logger);

const dc = resultNode.getResult();
const format = config.getOutputFormat(dc.getOutputFormat());
const dc2 = format.preprocess(GeeResults.SYNC, context, dc, logger);
return await format.retrieve(context.ee, dc2);
}

export async function getResultLogs(user_id, id, log_level) {
Expand Down
6 changes: 4 additions & 2 deletions src/api/worker/webservice.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default async function run(config, storage, user, query, xyz) {

try {
const rect = storage.calculateXYZRect(...xyz);
const context = config.processingContext(user);
const context = config.processingContext(user, service);
// Update user id to the user id, which stored the job.
// See https://github.com/Open-EO/openeo-earthengine-driver/issues/19
context.setUserId(service.user_id);
Expand All @@ -29,7 +29,9 @@ export default async function run(config, storage, user, query, xyz) {
dc.setOutputFormat('png');
}

return await GeeResults.retrieve(context, dc, logger);
const format = config.getOutputFormat(dc.getOutputFormat());
const dc2 = format.preprocess(GeeResults.SERVICE, context, dc, logger);
return await format.retrieve(context.ee, dc2);
} catch(e) {
logger.error(e);
throw e;
Expand Down
5 changes: 3 additions & 2 deletions src/formats/bitmap.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import GeeResults from "../processes/utils/results.js";
import DataCube from "../datacube/datacube.js";
import Utils from "../utils/utils.js";
import FileFormat, { EPSGCODE_PARAMETER, SIZE_PARAMETER } from "./fileformat.js";
import HttpUtils from "../utils/http.js";

export const EPSGCODE_PARAMETER_BITMAP = Object.assign({}, EPSGCODE_PARAMETER);
EPSGCODE_PARAMETER_BITMAP.default = 4326;
Expand Down Expand Up @@ -96,7 +97,7 @@ export default class BitmapLike extends FileFormat {
return renderer === 'filmstrip';
}

preprocess(context, dc, logger) {
preprocess(mode, context, dc, logger) {
const ee = context.ee;
const parameters = dc.getOutputFormatParameters();

Expand Down Expand Up @@ -175,7 +176,7 @@ export default class BitmapLike extends FileFormat {
reject('Download URL provided by Google Earth Engine is empty.');
}
else {
resolve(url);
resolve(HttpUtils.stream(url));
}
});
});
Expand Down
12 changes: 8 additions & 4 deletions src/formats/fileformat.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const SIZE_PARAMETER = {

export const SCALE_PARAMETER = {
type: 'number',
description: 'Scale of the image in meters per pixel.',
description: 'Scale of the image in meters per pixel. Defaults to native resolution in batch jobs, and 100 otherwise.',
default: 100,
minimum: 1
};
Expand Down Expand Up @@ -81,15 +81,19 @@ export default class FileFormat {
};
}

preprocess(context, dc/*, logger*/) {
preprocess(mode, context, dc/*, logger*/) {
return dc;
}

async retrieve(/*ee, dc*/) {
async retrieve(/*ee, dc */) {
throw new Error('Not implemented');
}

async export(/*ee, dc*/) {
canExport() {
return false;
}

async export(/*ee, dc */) {
throw new Error('Not implemented');
}

Expand Down
3 changes: 2 additions & 1 deletion src/formats/gif.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import HttpUtils from "../utils/http.js";
import Utils from "../utils/utils.js";
import BitmapLike from "./bitmap.js";

Expand Down Expand Up @@ -53,7 +54,7 @@ export default class GifFormat extends BitmapLike {
reject('Download URL provided by Google Earth Engine is empty.');
}
else {
resolve(url);
resolve(HttpUtils.stream(url));
}
});
});
Expand Down
102 changes: 84 additions & 18 deletions src/formats/gtiff.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,25 @@ import GeeResults from "../processes/utils/results.js";
import DataCube from "../datacube/datacube.js";
import Utils from "../utils/utils.js";
import FileFormat, { EPSGCODE_PARAMETER, SCALE_PARAMETER } from "./fileformat.js";
import GeeProcessing from "../processes/utils/processing.js";
import HttpUtils from "../utils/http.js";

export default class GTiffFormat extends FileFormat {
constructor(title = 'GeoTiff', parameters = {}) {
super(title, parameters);
super(title, parameters, "Cloud-optimized in batch jobs, not cloud-optimized otherwise.");
this.addParameter('scale', SCALE_PARAMETER);
this.addParameter('epsgCode', EPSGCODE_PARAMETER);
this.addParameter('zipped', {
type: 'boolean',
description: 'Pack the GeoTiff files into ZIP files, one file per band.',
default: false
});
}

getGisDataTypes() {
return ['raster'];
}

getFileExtension(parameters) {
return parameters.zipped ? '.zip' : '.tiff';
getFileExtension(/*parameters*/) {
return '.tiff';
}

preprocess(context, dc, logger) {
preprocess(mode, context, dc, logger) {
const ee = context.ee;
const parameters = dc.getOutputFormatParameters();
const dc2 = new DataCube(ee, dc);
Expand All @@ -33,7 +30,8 @@ export default class GTiffFormat extends FileFormat {
if (dc2.hasT()) {
dc2.dimT().drop();
}
return dc2.setData(GeeResults.toImageOrCollection(ee, logger, dc.getData()));
const allowMultiple = (mode === GeeResults.BATCH);
return dc2.setData(GeeResults.toImageOrCollection(ee, logger, dc.getData(), allowMultiple));
}

async retrieve(ee, dc) {
Expand All @@ -46,18 +44,13 @@ export default class GTiffFormat extends FileFormat {
crs = Utils.crsToString(dc.getCrs());
}

const format = parameters.zipped ? 'ZIPPED_GEO_TIFF' : 'GEO_TIFF';

const data = dc.getData();
if (data instanceof ee.ImageCollection) {
// todo: implement
}
else if (data instanceof ee.Image) {
if (data instanceof ee.Image) {
const eeOpts = {
scale: parameters.scale || 100,
region,
crs,
format
format: 'GEO_TIFF'
};
return await new Promise((resolve, reject) => {
data.getDownloadURL(eeOpts, (url, err) => {
Expand All @@ -68,12 +61,85 @@ export default class GTiffFormat extends FileFormat {
reject('Download URL provided by Google Earth Engine is empty.');
}
else {
resolve(url);
resolve(HttpUtils.stream(url));
}
});
});
}
else {
throw new Error('Only single images are supported in this processing mode for GeoTIFF.');
}
}

canExport() {
return true;
}

async export(ee, dc, job) {
const parameters = dc.getOutputFormatParameters();

let region = null;
let crs = null;
if (dc.hasXY()) {
region = Utils.bboxToGeoJson(dc.getSpatialExtent());
crs = Utils.crsToString(dc.getCrs());
}

const data = ee.ImageCollection(dc.getData());
const imageList = data.toList(data.size());
const imgCount = await GeeProcessing.evaluate(imageList.size());
const tasks = [];
for (let i = 0; i < imgCount; i++) {
let taskId = null;
let error = null;
let imageId;
try {
const image = ee.Image(imageList.get(i));
imageId = await GeeProcessing.evaluate(image.id());

let crsTransform, scale;
if (parameters.scale > 0) {
scale = parameters.scale;
}
else {
const projection = await GeeProcessing.evaluate(image.projection());
crsTransform = projection.transform;
}

const task = ee.batch.Export.image.toDrive({
image,
description: job.description,
folder: job._id,
fileNamePrefix: imageId,
skipEmptyTiles: true,
crs,
crsTransform,
region,
scale,
fileFormat: 'GeoTIFF',
formatOptions: {
cloudOptimized: true,
// noData: NaN
}
});
taskId = await new Promise((resolve, reject) => {
task.start(
() => resolve(task.id),
(message) => reject(new Error(message))
)
});
} catch (e) {
error = e.message;
} finally {
tasks.push({
taskId,
imageId,
error
});
}
}

return tasks;
}

}
Loading

0 comments on commit 2a16251

Please sign in to comment.