Skip to content

Commit

Permalink
Upload accumulated data (local file) to amazon s3 opensearch-project#…
Browse files Browse the repository at this point in the history
…1048

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
  • Loading branch information
deepaksahu562 committed Feb 28, 2023
1 parent 2e7d91f commit 67a3fbb
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -139,6 +140,7 @@ public void localFileAccumulator() {
LOG.info(
"Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec",
byteCount, eventCount, eventCollectionDuration);
new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate();
} catch (Exception e) {
LOG.error("Exception while storing recoreds into Local-file", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.accumulator;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.NavigableSet;
import java.util.Optional;

import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex;
import org.opensearch.dataprepper.plugins.sink.S3SinkConfig;
import org.opensearch.dataprepper.plugins.sink.S3SinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.waiters.S3Waiter;

/**
* Upload accumulated data (local file) to amazon s3
*/
public class LocalFileAccumulator implements SinkAccumulator {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileAccumulator.class);
private static final int MAX_RETRY = 3;
private final S3SinkService s3SinkService;
private final S3SinkConfig s3SinkConfig;

private final NavigableSet<String> localFileEventSet;
private String fileAbsolutePath = null;
private String localFileName = null;

/**
*
* @param localFileEventSet
* @param s3SinkService
* @param s3SinkConfig
*/
public LocalFileAccumulator(final NavigableSet<String> localFileEventSet, final S3SinkService s3SinkService,
final S3SinkConfig s3SinkConfig) {
this.localFileEventSet = localFileEventSet;
this.s3SinkService = s3SinkService;
this.s3SinkConfig = s3SinkConfig;
}

@Override
public void doAccumulate() {
boolean retry = Boolean.FALSE;
localFileName = S3ObjectIndex.getIndexAliasWithDate(s3SinkConfig.getObjectOptions().getFilePattern());
File file = new File(localFileName);
int retryCount = MAX_RETRY;

try (BufferedWriter writer = new BufferedWriter(new FileWriter(localFileName))) {
for (String event : localFileEventSet) {
writer.write(event);
}
fileAbsolutePath = file.getAbsoluteFile().toString();
writer.flush();
LOG.info("data stored in local file {}", fileAbsolutePath);
do {
retry = !fileSaveToS3();
if (retryCount == 0) {
retry = Boolean.FALSE;
LOG.warn("Maximum retry count 3 reached, Unable to store {} into Amazon S3", localFileName);
}
if (retry) {
LOG.info("Retry : {}", (MAX_RETRY - --retryCount));
Thread.sleep(5000);
}
} while (retry);
} catch (IOException e) {
LOG.error("Events unable to save into local file : {}", localFileName, e);
} catch (Exception e) {
LOG.error("Unable to store {} into Amazon S3", localFileName, e);
} finally {
try {
boolean isLocalFileDeleted = Files.deleteIfExists(Paths.get(fileAbsolutePath));
if (isLocalFileDeleted) {
LOG.info("Local file deleted successfully {}", fileAbsolutePath);
} else {
LOG.warn("Local file not deleted {}", fileAbsolutePath);
}
} catch (IOException e) {
LOG.error("Local file unable to deleted {}", fileAbsolutePath);
e.printStackTrace();
}
}
}

@SuppressWarnings("finally")
private boolean fileSaveToS3() {
final String bucketName = s3SinkConfig.getBucketName();
final String path = s3SinkConfig.getKeyPathPrefix();
final String key = path + "/" + localFileName;
boolean isFileSaveToS3 = Boolean.FALSE;
try {
S3Client client = s3SinkService.getS3Client();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucketName).key(key).acl("public-read")
.build();
client.putObject(request, RequestBody.fromFile(new File(fileAbsolutePath)));
S3Waiter waiter = client.waiter();
HeadObjectRequest requestWait = HeadObjectRequest.builder().bucket(bucketName).key(key).build();
WaiterResponse<HeadObjectResponse> waiterResponse = waiter.waitUntilObjectExists(requestWait);
Optional<HeadObjectResponse> response = waiterResponse.matched().response();
isFileSaveToS3 = response.isPresent();
} catch (AwsServiceException | SdkClientException e) {
LOG.error("Amazon s3 client Exception : ", e);
} finally {
if (isFileSaveToS3) {
LOG.info("File {} was uploaded..... Success !!", localFileName);
} else {
LOG.info("File {} was uploaded..... Failed !!", localFileName);
}
return isFileSaveToS3;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.accumulator;

public interface SinkAccumulator {

void doAccumulate();
}

0 comments on commit 67a3fbb

Please sign in to comment.