Skip to content

Commit

Permalink
Changes for concurrent putinto in cdc streaming app.
Browse files Browse the repository at this point in the history
  • Loading branch information
supriya committed Aug 18, 2018
1 parent cdf37ad commit 35329e7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
Empty file.
67 changes: 55 additions & 12 deletions dtests/src/test/java/io/snappydata/hydra/cluster/SnappyTest.java
Expand Up @@ -127,6 +127,11 @@ public class SnappyTest implements Serializable {
public static int connPoolType = SnappyConnectionPoolPrms.getConnPoolType(connPool);
private static HydraThreadLocal localconnection = new HydraThreadLocal();

public static int finalStart = SnappyCDCPrms.getInitEndRange() + 1;
public static int finalEnd = SnappyCDCPrms.getInitEndRange() + 10;



/**
* (String) APP_PROPS to set dynamically
*/
Expand Down Expand Up @@ -172,6 +177,9 @@ public static synchronized void HydraTask_initializeSnappyTest() {
snappyTest.generateConfig("primaryLocatorHost");
snappyTest.generateConfig("primaryLocatorPort");
}



}
}
}
Expand Down Expand Up @@ -2085,21 +2093,40 @@ public void executeSparkJob(Vector jobClassNames, String logFileName) {
userAppArgs = userAppArgs + " " + dynamicAppProps.get(getMyTid());
}
if (SnappyCDCPrms.getIsCDC()) {
int finalStart = SnappyCDCPrms.getInitEndRange() + 1;
int finalEnd = SnappyCDCPrms.getInitEndRange() + 1000;
Log.getLogWriter().info("Start range and end range : " + finalStart + " & " + finalEnd);
String appName = SnappyCDCPrms.getAppName();
if (appName.equals("CDCIngestionApp2")) {
int tempFinalStart = (Integer) SnappyBB.getBB().getSharedMap().get("finalStartRange");
int tempEndRange = (Integer) SnappyBB.getBB().getSharedMap().get("finalEndRange");
Log.getLogWriter().info("For second Ingestion New Start range and end range : " + tempFinalStart + " & " + tempEndRange);
userAppArgs = tempFinalStart + " " + tempEndRange + " " + userAppArgs;
SnappyBB.getBB().getSharedMap().put("finalStartRange", tempEndRange + 1);
SnappyBB.getBB().getSharedMap().put("finalEndRange", tempEndRange + 1000);
int BBfinalStart2 = (Integer) SnappyBB.getBB().getSharedMap().get("START_RANGE_APP2");
int BBfinalEnd2 = (Integer) SnappyBB.getBB().getSharedMap().get("END_RANGE_APP2");
int finalStart2,finalEnd2;
if(BBfinalStart2 == 0 || BBfinalEnd2 == 0) {
finalStart2 = finalStart;
finalEnd2 = finalEnd;
}
else {
finalStart2 = BBfinalStart2;
finalEnd2 = BBfinalEnd2;
}
userAppArgs = finalStart2 + " " + finalEnd2 + " " + userAppArgs;
Log.getLogWriter().info("For CDCIngestionApp2 app New Start range and end range : " + finalStart2 + " & " + finalEnd2 + " and args = " + userAppArgs);
SnappyBB.getBB().getSharedMap().put("START_RANGE_APP2", finalEnd2 + 1);
SnappyBB.getBB().getSharedMap().put("END_RANGE_APP2", finalEnd2 + 100);
} else if (appName.equals("CDCIngestionApp1")) {
userAppArgs = userAppArgs + " " + finalStart + " " + finalEnd;
SnappyBB.getBB().getSharedMap().put("finalStartRange", finalStart);
SnappyBB.getBB().getSharedMap().put("finalEndRange", finalEnd);
int BBfinalStart1 = (Integer) SnappyBB.getBB().getSharedMap().get("START_RANGE_APP1");
int BBfinalEnd1 = (Integer) SnappyBB.getBB().getSharedMap().get("END_RANGE_APP1");
int finalStart1,finalEnd1;
if(BBfinalStart1 == 0 || BBfinalEnd1 == 0) {
finalStart1 = finalStart;
finalEnd1 = finalEnd;
}
else {
finalStart1 = BBfinalStart1;
finalEnd1 = BBfinalEnd1;
}
userAppArgs = finalStart1 + " " + finalEnd1 + " " + userAppArgs;
Log.getLogWriter().info("For CDCIngestionApp1 app New Start range and end range : " + finalStart1 + " & " + finalEnd1 + " and args = " + userAppArgs);
SnappyBB.getBB().getSharedMap().put("START_RANGE_APP1", finalEnd1 + 1);
SnappyBB.getBB().getSharedMap().put("END_RANGE_APP1", finalEnd1 +100);

}
else if(appName.equals("BulkDeleteApp")){
commonArgs = " --conf spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError" +
Expand Down Expand Up @@ -2151,6 +2178,22 @@ else if(appName.equals("BulkDeleteApp")){
}
}

public static void HydraTask_InitializeBB(){
try{
Log.getLogWriter().info("InsideHydraTask_InitializeBB ");
int startR = SnappyCDCPrms.getInitStartRange();
int endR = SnappyCDCPrms.getInitEndRange();
SnappyBB.getBB().getSharedMap().put("START_RANGE_APP1", startR);
SnappyBB.getBB().getSharedMap().put("END_RANGE_APP1", endR);
SnappyBB.getBB().getSharedMap().put("START_RANGE_APP2", startR + 5000000);
SnappyBB.getBB().getSharedMap().put("END_RANGE_APP2", 10 + (startR + 5000000));
Log.getLogWriter().info("Finishe HydraTask_InitializeBB ");
}
catch(Exception e){
Log.getLogWriter().info("HydraTask_InitializeBB exception " + e.getMessage());
}
}

protected static String getAbsoluteJarLocation(String jarPath, final String jarName) {
String absoluteJarPath = null;
File baseDir = new File(jarPath);
Expand Down

0 comments on commit 35329e7

Please sign in to comment.