From 63ce030aabb6c739bcbc26d5e84eaf381f4df507 Mon Sep 17 00:00:00 2001 From: Simon Bear Date: Thu, 24 Jun 2021 01:30:42 +1000 Subject: [PATCH] WIP AVRO versions of CSV exports --- .../ala/images/WebServiceController.groovy | 117 ++++++++- .../au/org/ala/images/ImageService.groovy | 240 +++++++++++++++++- 2 files changed, 349 insertions(+), 8 deletions(-) diff --git a/grails-app/controllers/au/org/ala/images/WebServiceController.groovy b/grails-app/controllers/au/org/ala/images/WebServiceController.groovy index 975467c..f23c519 100644 --- a/grails-app/controllers/au/org/ala/images/WebServiceController.groovy +++ b/grails-app/controllers/au/org/ala/images/WebServiceController.groovy @@ -1747,6 +1747,26 @@ class WebServiceController { bos.close() } + @ApiOperation( + value = "Export Avro of entire image catalogue", + nickname = "exportAvro", + produces = "application/octet-stream", + consumes = "application/json", + httpMethod = "GET", + tags = ["Export"] + ) + @ApiResponses([ + @ApiResponse(code = 200, message = "OK"), + @ApiResponse(code = 405, message = "Method Not Allowed. Only GET is allowed")] + ) + def exportAvro(){ + response.setHeader("Content-disposition", "attachment;filename=images-export.avro") + response.contentType = "application/octet-stream" + def os = response.outputStream + imageService.exportAvro(os) + os.flush() + } + @ApiOperation( value = "Export CSV of URL to imageIdentifier mappings", nickname = "exportMapping", @@ -1769,6 +1789,26 @@ class WebServiceController { bos.close() } + @ApiOperation( + value = "Export Avro of URL to imageIdentifier mappings", + nickname = "exportMappingAvro", + produces = "application/octet-stream", + consumes = "application/json", + httpMethod = "GET", + tags = ["Export"] + ) + @ApiResponses([ + @ApiResponse(code = 200, message = "OK"), + @ApiResponse(code = 405, message = "Method Not Allowed. Only GET is allowed")] + ) + def exportMappingAvro(){ + response.setHeader("Content-disposition", "attachment;filename=image-mapping.avro") + response.contentType = "application/octet-stream" + def os = response.outputStream + imageService.exportMappingAvro(os) + os.flush() + } + @ApiOperation( value = "Export CSV of URL to imageIdentifier mappings", nickname = "exportDatasetMapping/{dataResourceUid}", @@ -1788,7 +1828,7 @@ class WebServiceController { ]) def exportDatasetMapping(){ if (!params.id){ - renderResults([success: false, message: "Failed to store image!"], 400) + renderResults([success: false, message: "id param is missing"], 400) } else { response.setHeader("Content-disposition", "attachment;filename=image-mapping-${params.id}.csv.gz") response.contentType = "application/gzip" @@ -1799,6 +1839,34 @@ class WebServiceController { } } + @ApiOperation( + value = "Export Avro of URL to imageIdentifier mappings", + nickname = "exportDatasetMappingAvro/{dataResourceUid}", + produces = "application/octet-stream", + consumes = "application/json", + httpMethod = "GET", + tags = ["Export"] + ) + @ApiResponses([ + @ApiResponse(code = 200, message = "OK"), + @ApiResponse(code = 400, message = "Missing dataResourceUid parameter"), + @ApiResponse(code = 405, message = "Method Not Allowed. Only GET is allowed")] + ) + @ApiImplicitParams([ + @ApiImplicitParam(name = "dataResourceUid", paramType = "path", required = true, value = "Data resource UID", dataType = "string") + ]) + def exportDatasetMappingAvro(){ + if (!params.id){ + renderResults([success: false, message: "id param is missing"], 400) + } else { + response.setHeader("Content-disposition", "attachment;filename=image-mapping-${params.id}.avro") + response.contentType = "application/octet-stream" + def os = response.outputStream + imageService.exportDatasetMappingAvro(params.id, os) + os.flush() + } + } + @ApiOperation( value = "Export CSV of URL to imageIdentifier mappings", notes = """Exports the following fields in CSV: @@ -1835,7 +1903,7 @@ class WebServiceController { ]) def exportDataset(){ if (!params.id){ - renderResults([success: false, message: "Failed to store image!"], 400) + renderResults([success: false, message: "id param is missing"], 400) } else { response.setHeader("Content-disposition", "attachment;filename=image-export-${params.id}.csv.gz") response.contentType = "application/gzip" @@ -1845,4 +1913,49 @@ class WebServiceController { bos.close() } } + + @ApiOperation( + value = "Export Avro of URL to imageIdentifier mappings", + notes = """Exports the following fields in Avro: + image_identifier as "imageID" + identifier + audience + contributor + created + creator + description + format + license + publisher + references + rightsHolder + source + title + type + """, + nickname = "exportDatasetAvro/{dataResourceUid}", + produces = "application/octet-stream", + consumes = "application/json", + httpMethod = "GET", + tags = ["Export"] + ) + @ApiResponses([ + @ApiResponse(code = 200, message = "OK"), + @ApiResponse(code = 400, message = "Missing dataResourceUid parameter"), + @ApiResponse(code = 405, message = "Method Not Allowed. Only GET is allowed")] + ) + @ApiImplicitParams([ + @ApiImplicitParam(name = "dataResourceUid", paramType = "path", required = true, value = "Data resource UID", dataType = "string") + ]) + def exportDatasetAvro(){ + if (!params.id){ + renderResults([success: false, message: "id param is missing"], 400) + } else { + response.setHeader("Content-disposition", "attachment;filename=image-export-${params.id}.avro") + response.contentType = "application/octet-stream" + def os = response.outputStream + imageService.exportDatasetAvro(params.id, os) + os.flush() + } + } } \ No newline at end of file diff --git a/grails-app/services/au/org/ala/images/ImageService.groovy b/grails-app/services/au/org/ala/images/ImageService.groovy index 4d4e4e5..2980aa1 100644 --- a/grails-app/services/au/org/ala/images/ImageService.groovy +++ b/grails-app/services/au/org/ala/images/ImageService.groovy @@ -9,8 +9,16 @@ import com.opencsv.RFC4180ParserBuilder import grails.gorm.transactions.Transactional import grails.orm.HibernateCriteriaBuilder import groovy.transform.Synchronized +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import groovyx.gpars.GParsPool import okhttp3.HttpUrl +import org.apache.avro.SchemaBuilder +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.avro.io.DatumWriter import org.apache.commons.codec.binary.Base64 import org.apache.commons.imaging.Imaging import org.apache.commons.imaging.common.ImageMetadata @@ -31,15 +39,27 @@ import org.springframework.beans.factory.annotation.Value import org.springframework.web.multipart.MultipartFile import java.sql.Connection +import java.sql.Date import java.sql.PreparedStatement import java.sql.ResultSet +import java.sql.ResultSetMetaData import java.sql.SQLException +import java.sql.Timestamp +import java.sql.Types import java.text.SimpleDateFormat import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.locks.ReentrantLock class ImageService { + static final String EXPORT_IMAGES_SQL = """SELECT * FROM export_images;""" + static final String EXPORT_MAPPING_SQL = """SELECT * FROM export_mapping;""" + static final String EXPORT_DATASET_MAPPING_SQL = """select * from export_dataset_mapping(?)""" + static final String EXPORT_DATASET_SQL = """select * from export_dataset(?)""" + + static final String DEFAULT_DATE_FORMAT = "dd-MMM-yyyy"; + static final String DEFAULT_TIMESTAMP_FORMAT = "dd-MMM-yyyy HH:mm:ss"; + def dataSource def imageStoreService def tagService @@ -1437,7 +1457,11 @@ class ImageService { * @return */ def exportCSV(OutputStream outputStream) { - eachRowToCSV(outputStream.newWriter('UTF-8'), """SELECT * FROM export_images;""") + eachRowToCSV(outputStream.newWriter('UTF-8'), EXPORT_IMAGES_SQL) + } + + def exportAvro(OutputStream outputStream) { + eachRowToAvro(outputStream, EXPORT_IMAGES_SQL) } /** @@ -1447,7 +1471,11 @@ class ImageService { * @return */ def exportMappingCSV(OutputStream outputStream) { - eachRowToCSV(outputStream.newWriter('UTF-8'), """SELECT * FROM export_mapping;""") + eachRowToCSV(outputStream.newWriter('UTF-8'), EXPORT_MAPPING_SQL) + } + + def exportMappingAvro(OutputStream outputStream) { + eachRowToAvro(outputStream, EXPORT_MAPPING_SQL) } /** @@ -1457,11 +1485,19 @@ class ImageService { * @return */ def exportDatasetMappingCSV(String datasetID, OutputStream outputStream) { - eachRowToCSV(outputStream.newWriter('UTF-8'), """select * from export_dataset_mapping(?)""", [datasetID], ',', '\\') + eachRowToCSV(outputStream.newWriter('UTF-8'), EXPORT_DATASET_MAPPING_SQL, [datasetID], ',', '\\') + } + + def exportDatasetMappingAvro(String datasetID, OutputStream outputStream) { + eachRowToAvro(outputStream, EXPORT_DATASET_MAPPING_SQL, [datasetID]) } def exportDatasetCSV(String datasetID, OutputStream outputStream) { - eachRowToCSV(outputStream.newWriter('UTF-8'), """select * from export_dataset(?)""", [datasetID]) + eachRowToCSV(outputStream.newWriter('UTF-8'), EXPORT_DATASET_SQL, [datasetID]) + } + + def exportDatasetAvro(String datasetID, OutputStream outputStream) { + eachRowToAvro(outputStream, EXPORT_DATASET_SQL, [datasetID]) } /** @@ -1490,6 +1526,199 @@ class ImageService { .build()) .build() + eachRowTo(sql, params) { rs -> + csvWriter.writeAll(rs, true) + } + writer.flush() + } + + /** + * Pass the results of the SQL query through a function that turns the query result metadata into an AVRO schema + * and then each row in the result set becomes a record in the resulting file. The AVRO file is written to the + * given OutputStream but the OutputStream is not closed. + * + * @param outputStream The output stream to write the AVRO file to + * @param sql The SQL query to run + * @param params The parameters for the SQL query + */ + private def eachRowToAvro(OutputStream outputStream, String sql, List params = []) { + DataFileWriter dataFileWriter = null + + eachRowTo(sql, params) {rs -> + def schema = avroSchema(rs) + + DatumWriter avroWriter = new GenericDatumWriter(schema) + dataFileWriter = new DataFileWriter(avroWriter) + dataFileWriter.create(schema, outputStream) + + ResultSetMetaData metadata = rs.getMetaData() + while (rs.next()) { + def rb = new GenericRecordBuilder(schema) + for (int i = 1; i <= metadata.getColumnCount(); i++) { + def label = metadata.getColumnLabel(i) + def value = getColumnValue(rs, metadata.getColumnType(i), i, DEFAULT_DATE_FORMAT, DEFAULT_TIMESTAMP_FORMAT) + rb.set(label, value) + } + dataFileWriter.append(rb.build()) + } + } + dataFileWriter?.flush() + } + + /** + * Helper function that turns the metadata from a ResultSet into an AVRO schema + * @param rs The SQL result set + * @return The AVRO schema for the result set + */ + private def avroSchema(ResultSet rs) { + ResultSetMetaData metadata = rs.getMetaData() + def schemaAssembler = SchemaBuilder.builder() + .record("record") + .fields() + for (int i = 1; i <= metadata.getColumnCount(); i++) { + String label = metadata.getColumnLabel(i) + int type = metadata.getColumnType(i) + switch (type) { + case Types.BOOLEAN: + schemaAssembler.optionalBoolean(label) + break; + case Types.DECIMAL: + case Types.REAL: + case Types.NUMERIC: + case Types.DOUBLE: + schemaAssembler.optionalDouble(label) + break; + case Types.FLOAT: + schemaAssembler.optionalFloat(label) + break + case Types.BIGINT: + schemaAssembler.optionalLong(label) + break + case Types.INTEGER: + case Types.TINYINT: + case Types.SMALLINT: + schemaAssembler.optionalInt(label) + break + case Types.BLOB: + schemaAssembler.optionalBytes(label) + break + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + case Types.NCLOB: + case Types.CLOB: + case Types.NVARCHAR: + case Types.NCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + case Types.VARCHAR: + case Types.CHAR: + default: + // This takes care of Types.BIT, Types.JAVA_OBJECT, and anything + // unknown. + schemaAssembler.optionalString(label) + } + } + + return schemaAssembler.endRecord() + } + + /** + * Helper function that turns a column value into the appropriate type for the AVRO record. + * @param rs The result set + * @param colType The Java Result Set Type for the given colIndex + * @param colIndex The column index in the current row in the Result Set + * @param dateFormatString The date format string for any Dates + * @param timestampFormatString The timestamp format string for any Timestamps + * @return A primitive object that can be written to an AVRO record + */ + private Object getColumnValue(ResultSet rs, int colType, int colIndex, String dateFormatString, String timestampFormatString) { + + def value + + switch (colType) { + case Types.BOOLEAN: + value = rs.getBoolean(colIndex); + break + case Types.DECIMAL: + case Types.REAL: + case Types.NUMERIC: + BigDecimal d = rs.getBigDecimal(colIndex) + value = d.doubleValue() + break + case Types.DOUBLE: + value = rs.getDouble(colIndex) + break + case Types.FLOAT: + value = rs.getFloat(colIndex) + break + case Types.BIGINT: + value = rs.getLong(colIndex) + break + case Types.INTEGER: + case Types.TINYINT: + case Types.SMALLINT: + value = rs.getInt(colIndex) + break + case Types.DATE: + value = handleDate(rs.getDate(colIndex), dateFormatString); + break + case Types.TIME: + def time = rs.getTime(colIndex) + value = time ? Objects.toString(time) : null + break + case Types.TIMESTAMP: + value = handleTimestamp(rs.getTimestamp(colIndex), timestampFormatString); + break + case Types.NCLOB: + value = rs.getNClob(colIndex)?.characterStream?.text + break + case Types.CLOB: + value = rs.getClob(colIndex)?.characterStream?.text + break + case Types.NVARCHAR: + case Types.NCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + case Types.VARCHAR: + case Types.CHAR: + value = rs.getString(colIndex) + break + default: + // This takes care of Types.BIT, Types.JAVA_OBJECT, and anything + // unknown. + // TODO Array types? + def obj = rs.getObject(colIndex) + value = obj ? Objects.toString(obj) : null + } + + + if (rs.wasNull() || value == null) { + value = null + } + + return value + } + + private String handleDate(java.sql.Date date, String dateFormatString) throws SQLException { + SimpleDateFormat df = new SimpleDateFormat(dateFormatString) + return date == null ? null : df.format(date) + } + + protected String handleTimestamp(Timestamp timestamp, String timestampFormatString) { + SimpleDateFormat timeFormat = new SimpleDateFormat(timestampFormatString); + return timestamp == null ? null : timeFormat.format(timestamp); + } + + /** + * Runs a SQL query and then runs the passed in closure with the ResultSet for the closure. + * + * @param sql The SQL query + * @param params The parameters for the query + * @param c The cloure to receive a single java.sql.ResultSet as a parameter + */ + private def eachRowTo(String sql, List params, + @ClosureParams(value = SimpleType, options= 'java.sql.ResultSet' ) Closure c) { Connection conn = null PreparedStatement st = null ResultSet rs = null @@ -1508,7 +1737,7 @@ class ImageService { } rs = st.executeQuery() - csvWriter.writeAll(rs, true) + c(rs) } catch (SQLException e) { log.warn("Failed to execute: $sql because: ${e.message}") throw e @@ -1535,7 +1764,6 @@ class ImageService { log.debug("Caught exception closing connection: ${e.message} - continuing"); } } - writer.flush() }