Skip to content

Commit

Permalink
Merge pull request #2088 from IBM/robin-new-buffer
Browse files Browse the repository at this point in the history
issue 2049 replace byte buffer streams with more efficient implementation
  • Loading branch information
punktilious committed Mar 16, 2021
2 parents 0636671 + 37582c9 commit 7ccfdf5
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

package com.ibm.fhir.bulkdata.jbatch.export.fast;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.time.Instant;
import java.time.temporal.TemporalAccessor;
Expand Down Expand Up @@ -50,6 +49,7 @@
import com.ibm.fhir.persistence.ResourcePayload;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;
import com.ibm.fhir.persistence.util.InputOutputByteStream;
import com.ibm.fhir.search.date.DateTimeHandler;

/**
Expand Down Expand Up @@ -147,7 +147,7 @@ public class ResourcePayloadReader extends AbstractItemReader {
private long txEndTime;

// The buffer space we collect data into until we hit the threshold to push to COS
private ByteArrayOutputStream outputStream;
private InputOutputByteStream ioBuffer;

// The sum of the multi-part sizes for parts that have been uploaded
private long currentObjectSize;
Expand Down Expand Up @@ -186,7 +186,7 @@ public class ResourcePayloadReader extends AbstractItemReader {
*/
public ResourcePayloadReader() {
super();
this.outputStream = new ByteArrayOutputStream(this.initialBufferSize);
this.ioBuffer = new InputOutputByteStream(this.initialBufferSize);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Max resources Per Object: " + resourcesPerObject);
logger.fine("Part Upload Trigger Size: " + partUploadTriggerSize);
Expand Down Expand Up @@ -253,7 +253,7 @@ public void open(Serializable checkpoint) throws Exception {

// Just in case the framework tries to reopen an existing instance,
// make sure we start with an empty output stream
this.outputStream.reset();
this.ioBuffer.reset();
}

// Transient user data is required to signal completion of this partition
Expand Down Expand Up @@ -403,11 +403,12 @@ public Boolean processPayload(ResourcePayload t) {
completeCurrentUpload();
}

// Accumulate the payload in the outputStream buffer
if (this.outputStream.size() > 0) {
this.outputStream.write(NDJSON_LINE_SEPARATOR);
// Accumulate the payload in the output buffer
OutputStream outputStream = ioBuffer.outputStream();
if (this.ioBuffer.size() > 0) {
outputStream.write(NDJSON_LINE_SEPARATOR);
}
this.currentObjectSize += t.transferTo(this.outputStream);
this.currentObjectSize += t.transferTo(outputStream);
this.currentObjectResourceCount++;

// upload now if we have reached the Goldilocks threshold size for a part
Expand Down Expand Up @@ -461,7 +462,7 @@ private void uploadWhenReady() throws Exception {
// the total number of parts), but not so large that the
// upload would take too long and exceed our transaction
// timeout.
if (this.outputStream.size() > this.partUploadTriggerSize) {
if (this.ioBuffer.size() > this.partUploadTriggerSize) {
uploadPart();
}
}
Expand All @@ -474,15 +475,15 @@ private void uploadPart() throws Exception {
// S3 API: Part number must be an integer between 1 and 10000
int currentObjectPartNumber = uploadedParts.size() + 1;
if (logger.isLoggable(Level.FINE)) {
logger.fine(logPrefix() + " Uploading part# " + currentObjectPartNumber + " ["+ outputStream.size() + " bytes] for uploadId '" + uploadId + "'");
logger.fine(logPrefix() + " Uploading part# " + currentObjectPartNumber + " ["+ ioBuffer.size() + " bytes] for uploadId '" + uploadId + "'");
}

byte[] buffer = outputStream.toByteArray();
InputStream is = new ByteArrayInputStream(buffer);
// The ioBuffer can provide us with an InputStream without having to copy the byte-buffer
InputStream is = ioBuffer.inputStream();
PartETag uploadResult = BulkDataUtils.multiPartUpload(cosClient, cosBucketName, currentObjectName,
uploadId, is, buffer.length, currentObjectPartNumber);
uploadId, is, ioBuffer.size(), currentObjectPartNumber);
uploadedParts.add(uploadResult);
outputStream.reset();
ioBuffer.reset();
}

/**
Expand All @@ -494,7 +495,7 @@ private void completeCurrentUpload() throws Exception {
}

// upload any final amount of data we have in the buffer
if (this.outputStream.size() > 0) {
if (this.ioBuffer.size() > 0) {
logger.fine(logPrefix() + " uploading final part for '" + this.uploadId + "'");
uploadPart();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.ibm.fhir.persistence.jdbc.util.ResourceTypesCache;
import com.ibm.fhir.persistence.jdbc.util.ResourceTypesCacheUpdater;
import com.ibm.fhir.persistence.jdbc.util.SqlQueryData;
import com.ibm.fhir.persistence.util.InputOutputByteStream;
import com.ibm.fhir.schema.control.FhirSchemaConstants;

/**
Expand All @@ -66,6 +67,15 @@ public class ResourceDAOImpl extends FHIRDbDAOImpl implements ResourceDAO {

public static final String DEFAULT_VALUE_REINDEX_TSTAMP = "1970-01-01 00:00:00";

// column indices for all our resource reading queries
public static final int IDX_RESOURCE_ID = 1;
public static final int IDX_LOGICAL_RESOURCE_ID = 2;
public static final int IDX_VERSION_ID = 3;
public static final int IDX_LAST_UPDATED = 4;
public static final int IDX_IS_DELETED = 5;
public static final int IDX_DATA = 6;
public static final int IDX_LOGICAL_ID = 7;

// Read the current version of the resource
private static final String SQL_READ = "SELECT R.RESOURCE_ID, R.LOGICAL_RESOURCE_ID, R.VERSION_ID, R.LAST_UPDATED, R.IS_DELETED, R.DATA, LR.LOGICAL_ID " +
"FROM %s_RESOURCES R, %s_LOGICAL_RESOURCES LR WHERE " +
Expand Down Expand Up @@ -244,12 +254,15 @@ protected Resource createDTO(ResultSet resultSet) throws FHIRPersistenceDataAcce
Resource resource = new Resource();

try {
resource.setData(resultSet.getBytes("DATA"));
resource.setId(resultSet.getLong("RESOURCE_ID"));
resource.setLastUpdated(resultSet.getTimestamp("LAST_UPDATED"));
resource.setLogicalId(resultSet.getString("LOGICAL_ID"));
resource.setVersionId(resultSet.getInt("VERSION_ID"));
resource.setDeleted(resultSet.getString("IS_DELETED").equals("Y") ? true : false);
byte[] payloadData = resultSet.getBytes(IDX_DATA);
if (payloadData != null) {
resource.setDataStream(new InputOutputByteStream(payloadData, payloadData.length));
}
resource.setId(resultSet.getLong(IDX_RESOURCE_ID));
resource.setLastUpdated(resultSet.getTimestamp(IDX_LAST_UPDATED));
resource.setLogicalId(resultSet.getString(IDX_LOGICAL_ID));
resource.setVersionId(resultSet.getInt(IDX_VERSION_ID));
resource.setDeleted(resultSet.getString(IDX_IS_DELETED).equals("Y") ? true : false);
} catch (Throwable e) {
FHIRPersistenceDataAccessException fx = new FHIRPersistenceDataAccessException("Failure creating Resource DTO.");
throw severe(log, fx, e);
Expand Down Expand Up @@ -556,13 +569,13 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
stmt.setString(2, resource.getLogicalId());

// Check for large objects, and branch around it.
boolean large = FhirSchemaConstants.STORED_PROCEDURE_SIZE_LIMIT < resource.getData().length;
boolean large = FhirSchemaConstants.STORED_PROCEDURE_SIZE_LIMIT < resource.getDataStream().size();
if (large) {
// Outside of the normal flow we have a BIG JSON or XML
stmt.setNull(3, Types.BLOB);
} else {
// Normal Flow, we set the data
stmt.setBytes(3, resource.getData());
stmt.setBinaryStream(3, resource.getDataStream().inputStream());
}

lastUpdated = resource.getLastUpdated();
Expand All @@ -582,7 +595,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
String largeStmtString = String.format(LARGE_BLOB, resource.getResourceType());
try (PreparedStatement ps = connection.prepareStatement(largeStmtString)) {
// Use the long id to update the record in the database with the large object.
ps.setBytes(1, resource.getData());
ps.setBinaryStream(1, resource.getDataStream().inputStream());
ps.setLong(2, versionedResourceRowId);
long dbCallStartTime2 = System.nanoTime();
int numberOfRows = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import static com.ibm.fhir.persistence.jdbc.JDBCConstants.UTC;

import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -122,7 +123,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramet
long resourceId = this.storeResource(resource.getResourceType(),
parameters,
resource.getLogicalId(),
resource.getData(),
resource.getDataStream().inputStream(),
lastUpdated,
resource.isDeleted(),
sourceKey,
Expand Down Expand Up @@ -196,7 +197,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramet
* @return the resource_id for the entry we created
* @throws Exception
*/
public long storeResource(String tablePrefix, List<ExtractedParameterValue> parameters, String p_logical_id, byte[] p_payload, Timestamp p_last_updated, boolean p_is_deleted,
public long storeResource(String tablePrefix, List<ExtractedParameterValue> parameters, String p_logical_id, InputStream p_payload, Timestamp p_last_updated, boolean p_is_deleted,
String p_source_key, Integer p_version, Connection conn, ParameterDAO parameterDao) throws Exception {

final String METHODNAME = "storeResource() for " + tablePrefix + " resource";
Expand Down Expand Up @@ -413,7 +414,7 @@ public long storeResource(String tablePrefix, List<ExtractedParameterValue> para
stmt.setLong(1, v_resource_id);
stmt.setLong(2, v_logical_resource_id);
stmt.setInt(3, v_insert_version);
stmt.setBytes(4, p_payload);
stmt.setBinaryStream(4, p_payload);
stmt.setTimestamp(5, p_last_updated, UTC);
stmt.setString(6, p_is_deleted ? "Y" : "N");
stmt.executeUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@

import java.sql.Timestamp;

import com.ibm.fhir.persistence.util.InputOutputByteStream;

/**
* This class defines the Data Transfer Object representing a row in the FHIR Resource table.
*/
public class Resource {

private long id;
private String logicalId;
private int versionId;
private String resourceType;
private Timestamp lastUpdated;
private byte[] data;

// The buffer holding the payload data
private InputOutputByteStream dataStream;

private boolean deleted;


public Resource() {
super();
}
Expand Down Expand Up @@ -66,27 +71,32 @@ public void setVersionId(int versionId) {
this.versionId = versionId;
}

public byte[] getData() {
return data;
}

public void setData(byte[] data) {
this.data = data;
}

public boolean isDeleted() {
return deleted;
}

public void setDeleted(boolean deleted) {
this.deleted = deleted;
}

@Override
public String toString() {
return "Resource [id=" + id + ", logicalId=" + logicalId + ", versionId=" + versionId + ", resourceType="
+ resourceType + ", lastUpdated=" + lastUpdated + ", deleted=" + deleted + "]";
}

/**
* @return the dataStream
*/
public InputOutputByteStream getDataStream() {
return dataStream;
}

/**
* @param dataStream the dataStream to set
*/
public void setDataStream(InputOutputByteStream dataStream) {
this.dataStream = dataStream;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import static com.ibm.fhir.model.type.String.string;
import static com.ibm.fhir.model.util.ModelSupport.getResourceType;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
Expand Down Expand Up @@ -143,6 +141,7 @@
import com.ibm.fhir.persistence.jdbc.util.SqlQueryData;
import com.ibm.fhir.persistence.jdbc.util.TimestampPrefixedUUID;
import com.ibm.fhir.persistence.util.FHIRPersistenceUtil;
import com.ibm.fhir.persistence.util.InputOutputByteStream;
import com.ibm.fhir.persistence.util.LogicalIdentityProvider;
import com.ibm.fhir.schema.control.FhirSchemaConstants;
import com.ibm.fhir.search.SearchConstants;
Expand All @@ -168,6 +167,7 @@
public class FHIRPersistenceJDBCImpl implements FHIRPersistence, SchemaNameSupplier {
private static final String CLASSNAME = FHIRPersistenceJDBCImpl.class.getName();
private static final Logger log = Logger.getLogger(CLASSNAME);
private static final int DATA_BUFFER_INITIAL_SIZE = 10*1024; // 10KiB

protected static final String TXN_JNDI_NAME = "java:comp/UserTransaction";
public static final String TRX_SYNCH_REG_JNDI_NAME = "java:comp/TransactionSynchronizationRegistry";
Expand Down Expand Up @@ -340,7 +340,8 @@ public <T extends Resource> SingleResourceResult<T> create(FHIRPersistenceContex
final String METHODNAME = "create";
log.entering(CLASSNAME, METHODNAME);

ByteArrayOutputStream stream = new ByteArrayOutputStream();
// Most resources are well under 10K after being serialized and compressed
InputOutputByteStream ioStream = new InputOutputByteStream(DATA_BUFFER_INITIAL_SIZE);
String logicalId;

// We need to update the meta in the resource, so we need a modifiable version
Expand Down Expand Up @@ -380,10 +381,10 @@ public <T extends Resource> SingleResourceResult<T> create(FHIRPersistenceContex
resourceDTO.setResourceType(updatedResource.getClass().getSimpleName());

// Serialize and compress the Resource
GZIPOutputStream zipStream = new GZIPOutputStream(stream);
GZIPOutputStream zipStream = new GZIPOutputStream(ioStream.outputStream());
FHIRGenerator.generator( Format.JSON, false).generate(updatedResource, zipStream);
zipStream.finish();
resourceDTO.setData(stream.toByteArray());
resourceDTO.setDataStream(ioStream);
zipStream.close();

// The DAO objects are now created on-the-fly (not expensive to construct) and
Expand Down Expand Up @@ -496,7 +497,7 @@ public <T extends Resource> SingleResourceResult<T> update(FHIRPersistenceContex

Class<? extends Resource> resourceType = resource.getClass();
com.ibm.fhir.persistence.jdbc.dto.Resource existingResourceDTO;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
InputOutputByteStream ioStream = new InputOutputByteStream(DATA_BUFFER_INITIAL_SIZE);

// Resources are immutable, so we need a new builder to update it (since R4)
Resource.Builder resultResourceBuilder = resource.toBuilder();
Expand Down Expand Up @@ -571,10 +572,10 @@ public <T extends Resource> SingleResourceResult<T> update(FHIRPersistenceContex
resourceDTO.setResourceType(updatedResource.getClass().getSimpleName());

// Serialize and compress the Resource
GZIPOutputStream zipStream = new GZIPOutputStream(stream);
GZIPOutputStream zipStream = new GZIPOutputStream(ioStream.outputStream());
FHIRGenerator.generator(Format.JSON, false).generate(updatedResource, zipStream);
zipStream.finish();
resourceDTO.setData(stream.toByteArray());
resourceDTO.setDataStream(ioStream);
zipStream.close();

// Persist the Resource DTO.
Expand Down Expand Up @@ -809,7 +810,7 @@ public <T extends Resource> SingleResourceResult<T> delete(FHIRPersistenceContex

com.ibm.fhir.persistence.jdbc.dto.Resource existingResourceDTO = null;
T existingResource = null;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
InputOutputByteStream ioStream = new InputOutputByteStream(DATA_BUFFER_INITIAL_SIZE);

Resource.Builder resourceBuilder;

Expand Down Expand Up @@ -863,10 +864,10 @@ public <T extends Resource> SingleResourceResult<T> delete(FHIRPersistenceContex
resourceDTO.setVersionId(newVersionNumber);

// Serialize and compress the Resource
GZIPOutputStream zipStream = new GZIPOutputStream(stream);
GZIPOutputStream zipStream = new GZIPOutputStream(ioStream.outputStream());
FHIRGenerator.generator(Format.JSON, false).generate(updatedResource, zipStream);
zipStream.finish();
resourceDTO.setData(stream.toByteArray());
resourceDTO.setDataStream(ioStream);
zipStream.close();

Timestamp timestamp = FHIRUtilities.convertToTimestamp(lastUpdated.getValue());
Expand Down Expand Up @@ -1759,7 +1760,7 @@ private <T extends Resource> T convertResourceDTO(com.ibm.fhir.persistence.jdbc.
InputStream in = null;
try {
if (resourceDTO != null) {
in = new GZIPInputStream(new ByteArrayInputStream(resourceDTO.getData()));
in = new GZIPInputStream(resourceDTO.getDataStream().inputStream());
if (elements != null) {
// parse/filter the resource using elements
resource = FHIRParser.parser(Format.JSON).as(FHIRJsonParser.class).parseAndFilter(in, elements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
stmt = connection.prepareCall(stmtString);
stmt.setString(1, resource.getResourceType());
stmt.setString(2, resource.getLogicalId());
stmt.setBytes(3, resource.getData());
stmt.setBinaryStream(3, resource.getDataStream().inputStream());

lastUpdated = resource.getLastUpdated();
stmt.setTimestamp(4, lastUpdated, UTC);
Expand Down
Loading

0 comments on commit 7ccfdf5

Please sign in to comment.