This repository has been archived by the owner on Jul 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 627
/
LambdaContainer.java
248 lines (212 loc) · 10.2 KB
/
LambdaContainer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package com.amazonaws.bigdatablog.edba;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.ClusterState;
import com.amazonaws.services.elasticmapreduce.model.ClusterSummary;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.ListClustersRequest;
import com.amazonaws.services.elasticmapreduce.model.ListClustersResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepState;
import com.amazonaws.services.elasticmapreduce.model.Tag;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.event.S3EventNotification.S3EventNotificationRecord;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.util.StringUtils;
public class LambdaContainer {
//Validation/Conversion Layer function
public void validateAndNormalizeInputData(S3Event event,Context ctx) throws Exception{
AmazonS3 s3Client;
InputStream inputFileStream=null;
InputStream readableDataStream=null;
List<S3EventNotificationRecord> notificationRecords = event.getRecords();
s3Client = new AmazonS3Client();
String eventFileName,siteName,dbfName;
CSVParser fileParser = null;
for(S3EventNotificationRecord record : notificationRecords){
eventFileName = record.getS3().getObject().getKey();
S3Object s3Object = s3Client.getObject(new GetObjectRequest(record.getS3().getBucket().getName(), record.getS3().getObject().getKey()));
inputFileStream = s3Object.getObjectContent();
fileParser = new CSVParser(new InputStreamReader(inputFileStream),CSVFormat.TDF.withCommentMarker('-'));
List<CSVRecord> records = fileParser.getRecords();
StringWriter writer = new StringWriter();
CSVPrinter printer =null;
if(records.get(0).toString().matches(".*[^0-9].*")){
records.remove(0);
}
printer = new CSVPrinter(writer,CSVFormat.DEFAULT.withRecordSeparator(System.getProperty("line.separator")));
printer.printRecords(records);
printer.flush();
readableDataStream = new ByteArrayInputStream(writer.toString().getBytes("utf-8"));
s3Client.putObject(record.getS3().getBucket().getName(),"validated/"+eventFileName+".csv",readableDataStream,new ObjectMetadata());
printer.close();
readableDataStream.close();
}
}
// Tracking Input Layer lambda function
public void auditValidatedFile(S3Event event,Context ctx) throws Exception{
Connection conn = new com.mysql.jdbc.Driver().connect(props.getProperty("url"), props);
List<S3EventNotificationRecord> notificationRecords = event.getRecords();
PreparedStatement ps = conn.prepareStatement(props.getProperty("sql.auditValidatedFile"));
for(S3EventNotificationRecord record : notificationRecords){
String fileURL = record.getS3().getBucket().getName()+"/"+record.getS3().getObject().getKey();
ps.setString(1, fileURL);
ps.setString(2, "VALIDATED");
ps.setString(3,"VALIDATED");
ps.addBatch();
}
ps.executeBatch();
ps.close();
conn.close();
}
// EMR Job Criteria Check and Submission lambda function
public void checkConditionStatusAndFireEMRStep() throws Exception{
Connection conn = new com.mysql.jdbc.Driver().connect(props.getProperty("url"), props);
Statement conditionFetchStmt = conn.createStatement();
ResultSet rs = conditionFetchStmt.executeQuery(props.getProperty("sql.conditionFetch"));
PreparedStatement updateJobConfigPS = conn.prepareStatement(props.getProperty("sql.updateJobConfigStatus"));
Statement jobInputFilesMinTimestampStmt = conn.createStatement();
Statement updateSubmittedJobsStmt=conn.createStatement();
List<String> activeClusters = getActiveTaggedClusters();
String clusterId = null;
while(rs.next()){
System.out.println("job_input_pattern ::"+rs.getString("job_input_pattern"));
System.out.println("sql.jobInputFilesMinTSAndCount :: "+props.getProperty("sql.jobInputFilesMinTSAndCount"));
ResultSet conditionQueryResult = jobInputFilesMinTimestampStmt.executeQuery(props.getProperty("sql.jobInputFilesMinTSAndCount")+" "+rs.getString("job_input_pattern"));
conditionQueryResult.next();
if(conditionQueryResult.getTimestamp("min_lvt").after(rs.getTimestamp("last_run_timestamp"))
&&
conditionQueryResult.getInt("file_count") >= rs.getInt("job_min_file_count")
&&
isAdditionalCriteriaPassed(rs.getString("job_addl_criteria"),conn)){
clusterId = activeClusters.get(new Random().nextInt(activeClusters.size()-0));
String jobId = fireEMRJob(rs.getString("job_params"),clusterId);
updateJobConfigPS.setString(1,clusterId+":"+jobId);
updateJobConfigPS.setString(2, rs.getString("job_config_id"));
updateJobConfigPS.addBatch();
updateSubmittedJobsStmt.addBatch(props.getProperty("sql.updateSubmittedJobsJSON").replaceAll("\\?", rs.getString("job_config_id"))+" "+rs.getString("job_input_pattern"));
}
}
updateJobConfigPS.executeBatch();
updateSubmittedJobsStmt.executeBatch();
updateSubmittedJobsStmt.close();
updateJobConfigPS.close();
conditionFetchStmt.close();
conn.close();
}
// EMR Job Monitor lambda function
public void monitorEMRStep() throws Exception {
List<String> stepIds = new ArrayList<String>();
Connection conn = new com.mysql.jdbc.Driver().connect(props.getProperty("url"), props);
ResultSet openStepsRS = conn.createStatement().executeQuery(props.getProperty("sql.retrieveOpenSteps"));
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
DescribeStepRequest stepReq=new DescribeStepRequest();
PreparedStatement ps = conn.prepareStatement(props.getProperty("sql.updateStepStatus"));
while(openStepsRS.next()){
stepReq.setClusterId(openStepsRS.getString("cluster_id"));
stepReq.setStepId(openStepsRS.getString("step_id"));
String stepState = emr.describeStep(stepReq).getStep().getStatus().getState();
if(stepState.equals(StepState.COMPLETED.toString())){
ps.setString(1,StepState.COMPLETED.toString());
}else if (stepState.equals(StepState.FAILED.toString())){
ps.setString(1,StepState.FAILED.toString());
}
ps.setString(2,openStepsRS.getString("job_config_id"));
ps.addBatch();
}
ps.executeBatch();
ps.close();
conn.close();
}
// adds EMR step
protected String fireEMRJob(String paramsStr,String clusterId){
StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
emr.setRegion(Region.getRegion(Regions.fromName(System.getenv().get("AWS_REGION"))));
Application sparkConfig = new Application()
.withName("Spark");
String[] params = paramsStr.split(",");
StepConfig enabledebugging = new StepConfig()
.withName("Enable debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newEnableDebuggingStep());
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs(params);
final StepConfig sparkStep = new StepConfig()
.withName("Spark Step")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(sparkStepConf);
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest(clusterId)
.withSteps(new ArrayList<StepConfig>(){{add(sparkStep);}});
AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
return result.getStepIds().get(0);
}
protected List<String> getActiveTaggedClusters() throws Exception{
AmazonElasticMapReduceClient emrClient = new AmazonElasticMapReduceClient();
List<String> waitingClusters = new ArrayList<String>();
ListClustersResult clusterResult = emrClient.listClusters(new ListClustersRequest().withClusterStates(ClusterState.WAITING));
DescribeClusterRequest specifcTagDescribe = new DescribeClusterRequest();
specifcTagDescribe.putCustomQueryParameter("Cluster.Tags",null);
for( ClusterSummary cluster : clusterResult.getClusters()){
System.out.println("list cluster id "+cluster.getId());
List<Tag> tagList = emrClient.describeCluster(specifcTagDescribe.withClusterId(cluster.getId())).getCluster().getTags();
for(Tag tag:tagList){
if(tag.getKey().equals(props.getProperty("edba.cluster.tag.key"))){
waitingClusters.add(cluster.getId());
}
}
}
return waitingClusters;
}
/**
* Checks whether additional criteria returned a non empty resultset.
*/
protected boolean isAdditionalCriteriaPassed(String sql, Connection conn) throws Exception{
if(StringUtils.isNullOrEmpty(sql)){
return true;
}
ResultSet rs = conn.createStatement().executeQuery(sql);
if(!rs.next()){
return false; // Empty Resultset
}
return true;
}
static Properties props=null;
static{
try{
props = new Properties();
props.load(LambdaContainer.class.getResourceAsStream("/edba_lambda_config.properties"));
}catch(Exception ce){
ce.printStackTrace();
}
}
}