Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split >5GB Files automatically #21

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import static com.ibm.stocator.fs.swift.SwiftConstants.PUBLIC;
import static com.ibm.stocator.fs.swift.SwiftConstants.BLOCK_SIZE;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_BLOCK_SIZE_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.OBJECT_SIZE;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_OBJECT_SIZE_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_PROJECT_ID_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_USER_ID_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.FMODE_AUTOMATIC_DELETE_PROPERTY;
Expand Down Expand Up @@ -83,6 +85,9 @@ public static Properties initialize(URI uri, Configuration conf) throws IOExcept
false);
Utils.updateProperty(conf, prefix2D, prefix, BLOCK_SIZE, props, SWIFT_BLOCK_SIZE_PROPERTY,
false);

Utils.updateProperty(conf, prefix2D, prefix, OBJECT_SIZE, props, SWIFT_OBJECT_SIZE_PROPERTY,
false);
Utils.updateProperty(conf, prefix2D, prefix, FMODE_DELETE_TEMP_DATA, props,
FMODE_AUTOMATIC_DELETE_PROPERTY, false);
Utils.updateProperty(conf, prefix2D, prefix, PUBLIC, props, SWIFT_PUBLIC_PROPERTY, false);
Expand Down
28 changes: 23 additions & 5 deletions src/main/java/com/ibm/stocator/fs/swift/SwiftAPIClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_CONTAINER_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_PUBLIC_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_BLOCK_SIZE_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_OBJECT_SIZE_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_PROJECT_ID_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_USER_ID_PROPERTY;
import static com.ibm.stocator.fs.swift.SwiftConstants.FMODE_AUTOMATIC_DELETE_PROPERTY;
Expand Down Expand Up @@ -130,7 +131,16 @@ public class SwiftAPIClient implements IStoreClient {
private final int pageListSize = 100;

/*
* support for different schema models
* Maximum size for a Swift Object
* Default is 5GB
*/
private long maxObjectSize;
private static final long DEFAULT_MAX_OBJECT_SIZE = 5L * 1024 * 1024 * 1024;

private final long bufferSize = 65536;

/*
* Support for different schema models
*/
private String schemaProvided;

Expand Down Expand Up @@ -167,11 +177,19 @@ public void initiate() throws IOException {
cachedSparkJobsStatus = new HashMap<String, Boolean>();
schemaProvided = conf.get("fs.swift.schema", Constants.SWIFT2D);
Properties props = ConfigurationHandler.initialize(filesystemURI, conf);

AccountConfig config = new AccountConfig();
fModeAutomaticDelete = "true".equals(props.getProperty(FMODE_AUTOMATIC_DELETE_PROPERTY,
"false"));
blockSize = Long.valueOf(props.getProperty(SWIFT_BLOCK_SIZE_PROPERTY,
"128")).longValue() * 1024 * 1024L;
"128")) * 1024 * 1024;
maxObjectSize = Long.valueOf(props.getProperty(SWIFT_OBJECT_SIZE_PROPERTY,
"5120")) * 1024 * 1024;
if (maxObjectSize > DEFAULT_MAX_OBJECT_SIZE || maxObjectSize < 0) {
LOG.warn("Maximum object size cannot be set to {} bytes, setting to default value of {}.",
maxObjectSize, DEFAULT_MAX_OBJECT_SIZE);
maxObjectSize = DEFAULT_MAX_OBJECT_SIZE;
}
String authMethod = props.getProperty(SWIFT_AUTH_METHOD_PROPERTY);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationConfig.Feature.WRAP_ROOT_VALUE, true);
Expand Down Expand Up @@ -430,7 +448,7 @@ public FileStatus[] list(String hostName, Path path, boolean fullListing) throws
continue;
} else {
// if we here - data created by spark and job completed successfully
// however there be might parts of failed tasks that were not aborted
// however there might be parts of failed tasks that were not aborted
// we need to make sure there are no failed attempts
if (nameWithoutTaskID(tmp.getName())
.equals(nameWithoutTaskID(previousElement.getName()))) {
Expand Down Expand Up @@ -498,8 +516,8 @@ public FSDataOutputStream createObject(String objName, String contentType,
URL url = new URL(mJossAccount.getAccessURL() + "/" + objName);
LOG.debug("PUT {}. Content-Type : {}", url.toString(), contentType);
try {
return new FSDataOutputStream(new SwiftOutputStream(mJossAccount, url, contentType, metadata),
statistics);
return new FSDataOutputStream(new SwiftOutputStream(mJossAccount, url, contentType,
metadata, maxObjectSize), statistics);
} catch (IOException e) {
LOG.error(e.getMessage());
throw e;
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/ibm/stocator/fs/swift/SwiftConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class SwiftConstants {
public static final String BLOCK_SIZE = ".block.size";
public static final String SWIFT_BLOCK_SIZE_PROPERTY = Constants.FS_SWIFT2D + BLOCK_SIZE;

public static final String OBJECT_SIZE = ".object.size";
public static final String SWIFT_OBJECT_SIZE_PROPERTY = Constants.FS_SWIFT2D + OBJECT_SIZE;

public static final String FMODE_DELETE_TEMP_DATA = ".failure.mode.delete";
public static final String FMODE_AUTOMATIC_DELETE_PROPERTY = Constants.FS_SWIFT2D
+ FMODE_DELETE_TEMP_DATA;
Expand Down
80 changes: 75 additions & 5 deletions src/main/java/com/ibm/stocator/fs/swift/SwiftOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URL;
import java.net.ProtocolException;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,6 +60,18 @@ public class SwiftOutputStream extends OutputStream {
*/
private HttpURLConnection mHttpCon;

/*
* Maximum size to be written to a Swift object before a new object is created.
*/
private long maxSplitSize;
private long totalBytesWritten = 0L;
private int splitCount = 0;
private static final int INT_BYTE_SIZE = 4;

private JossAccount jossAccount;
private String content;
private Map<String, String> requestProperties;

/*
* Access url
*/
Expand All @@ -73,9 +87,13 @@ public class SwiftOutputStream extends OutputStream {
* @throws IOException if error
*/
public SwiftOutputStream(JossAccount account, URL url, String contentType,
Map<String, String> metadata) throws IOException {
Map<String, String> metadata, long maxObjectSize) throws IOException {
mUrl = url;
HttpURLConnection httpCon = createConnection(account, url, contentType, metadata);
jossAccount = account;
content = contentType;
maxSplitSize = maxObjectSize;
requestProperties = metadata;
HttpURLConnection httpCon = createConnection(account, url, contentType, requestProperties);
try {
mOutputStream = httpCon.getOutputStream();
mHttpCon = httpCon;
Expand All @@ -84,7 +102,7 @@ public SwiftOutputStream(JossAccount account, URL url, String contentType,
LOG.warn(e.getMessage());
LOG.warn("Retry attempt. Re-authenticate");
account.authenticate();
httpCon = createConnection(account, url, contentType, metadata);
httpCon = createConnection(account, url, contentType, requestProperties);
mOutputStream = httpCon.getOutputStream();
mHttpCon = httpCon;
} catch (IOException e) {
Expand All @@ -105,10 +123,10 @@ public SwiftOutputStream(JossAccount account, URL url, String contentType,
*/
private HttpURLConnection createConnection(JossAccount account, URL url, String contentType,
Map<String, String> metadata) throws IOException {

HttpURLConnection newHttpCon = (HttpURLConnection) url.openConnection();
newHttpCon.setDoOutput(true);
newHttpCon.setRequestMethod("PUT");

newHttpCon.addRequestProperty("X-Auth-Token",account.getAuthToken());
newHttpCon.addRequestProperty("Content-Type", contentType);
if (metadata != null && !metadata.isEmpty()) {
Expand All @@ -129,21 +147,37 @@ private HttpURLConnection createConnection(JossAccount account, URL url, String

@Override
public void write(int b) throws IOException {
if (totalBytesWritten + INT_BYTE_SIZE >= maxSplitSize) {
splitFileUpload();
totalBytesWritten = 0;
}
mOutputStream.write(b);
totalBytesWritten += INT_BYTE_SIZE;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (totalBytesWritten + len >= maxSplitSize) {
splitFileUpload();
totalBytesWritten = 0;
}
mOutputStream.write(b, off, len);
totalBytesWritten += len;
}

@Override
public void write(byte[] b) throws IOException {
if (totalBytesWritten + b.length >= maxSplitSize) {
splitFileUpload();
totalBytesWritten = 0;
}
mOutputStream.write(b);
totalBytesWritten += b.length;
}

@Override
public void close() throws IOException {
LOG.trace("{} bytes written", totalBytesWritten);
mOutputStream.close();
InputStream is = null;
try {
Expand Down Expand Up @@ -178,4 +212,40 @@ public void close() throws IOException {
public void flush() throws IOException {
mOutputStream.flush();
}

/*
* Closes the old stream and sets up a new stream
*/
private void splitFileUpload() throws IOException {
mOutputStream.close();
URL oldURL = mHttpCon.getURL();
String prevSplitName = oldURL.getPath();
String currSplitName;
if (splitCount == 0) {
Pattern p = Pattern.compile("part-\\d\\d\\d\\d\\d-");
Matcher m = p.matcher(prevSplitName);
if (m.find()) {
currSplitName = new StringBuilder(prevSplitName).insert(m.end(),
"split-" + String.format("%05d", ++splitCount) + "-").toString();
} else {
currSplitName = new StringBuilder(prevSplitName).append("-split-"
+ String.format("%05d", ++splitCount)).toString();
}
} else {
currSplitName = prevSplitName.replaceAll("split-\\d\\d\\d\\d\\d",
"split-" + String.format("%05d", ++splitCount));
}
URL newURL = new URL(oldURL.getProtocol() + "://" + oldURL.getAuthority() + currSplitName);
try {
mHttpCon.disconnect();
HttpURLConnection newConn = createConnection(jossAccount, newURL, content,
requestProperties);
mOutputStream = newConn.getOutputStream();
mHttpCon = newConn;
} catch (IOException e) {
LOG.error(e.getMessage());
throw e;
}
}

}
36 changes: 36 additions & 0 deletions src/test/java/com/ibm/stocator/fs/swift2d/TestSwiftOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@

import java.io.IOException;
import java.text.MessageFormat;
import java.util.Properties;

import com.ibm.stocator.fs.swift.ConfigurationHandler;
import com.ibm.stocator.fs.common.ObjectStoreGlobFilter;
import com.ibm.stocator.fs.common.ObjectStoreGlobber;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

import static com.ibm.stocator.fs.swift.SwiftConstants.SWIFT_OBJECT_SIZE_PROPERTY;

public class TestSwiftOperations extends SwiftBaseTest {

protected byte[] data = SwiftTestUtils.generateDataset(getBlockSize() * 2, 0, 255);
Expand Down Expand Up @@ -83,6 +90,35 @@ public void testDataObject() throws Exception {
Assert.assertTrue(0 == stats.length);
}

@Test
public void testParitionedWrite() throws IOException {
Assume.assumeNotNull(getFs());
Configuration conf = new Configuration();
Properties props = ConfigurationHandler.initialize(new Path(getBaseURI()).toUri(), conf);
long maxObjectSize = Long.valueOf(props.getProperty(SWIFT_OBJECT_SIZE_PROPERTY,
"5")) * 1024 * 1024;

// Check object size is set in core-site.xml
Assume.assumeTrue(maxObjectSize != 0);

int expectedSplits = 5;

byte[] dataToSplit = SwiftTestUtils.generateDataset((int) maxObjectSize, 0, 255);
Path output = new Path(getBaseURI() + "/testDir/splits");
FSDataOutputStream stream = fs.create(output);
for (int i = 0; i < expectedSplits; i++) {
stream.write(dataToSplit);
}
stream.close();

FileStatus[] results = getFs().listStatus(output);
Assert.assertEquals(expectedSplits, results.length);
for (FileStatus status : results) {
Assert.assertTrue(status.getLen() <= maxObjectSize);
}
getFs().delete(output, false);
}

@Test
public void testFileExists() throws IOException {
Path testFile = new Path(getBaseURI() + "/testFile");
Expand Down