Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Commit

Permalink
[BAHIR-91] Upgrade Flink version to 1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tzulitai committed Mar 1, 2017
1 parent a830077 commit d1e0466
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 6 deletions.
1 change: 1 addition & 0 deletions flink-connector-activemq/pom.xml
Expand Up @@ -85,6 +85,7 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
Expand Down
Expand Up @@ -19,7 +19,10 @@

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
Expand Down Expand Up @@ -97,6 +100,22 @@ public void before() throws Exception {
amqSource = new AMQSource<>(config);
amqSource.setRuntimeContext(createRuntimeContext());
amqSource.open(new Configuration());
amqSource.initializeState(new FunctionInitializationContext() {
@Override
public boolean isRestored() {
return false;
}

@Override
public OperatorStateStore getOperatorStateStore() {
return mock(OperatorStateStore.class);
}

@Override
public KeyedStateStore getKeyedStateStore() {
return mock(KeyedStateStore.class);
}
});
}

private RuntimeContext createRuntimeContext() {
Expand Down
Expand Up @@ -22,14 +22,15 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -51,7 +52,7 @@ public class ActiveMQConnectorITCase {
public static final int MESSAGES_NUM = 10000;
public static final String QUEUE_NAME = "queue";
public static final String TOPIC_NAME = "topic";
private static ForkableFlinkMiniCluster flink;
private static LocalFlinkMiniCluster flink;
private static int flinkPort;

@BeforeClass
Expand All @@ -63,7 +64,7 @@ public static void beforeClass() {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");

flink = new ForkableFlinkMiniCluster(flinkConfig, false);
flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();

flinkPort = flink.getLeaderRPCPort();
Expand Down Expand Up @@ -211,9 +212,19 @@ public void run() {
while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
Thread.sleep(100);
Random random = new Random();
long checkpointId = random.nextLong();
final long checkpointId = random.nextLong();
synchronized (sourceContext.getCheckpointLock()) {
source.snapshotState(checkpointId, System.currentTimeMillis());
source.snapshotState(new FunctionSnapshotContext() {
@Override
public long getCheckpointId() {
return checkpointId;
}

@Override
public long getCheckpointTimestamp() {
return System.currentTimeMillis();
}
});
source.notifyCheckpointComplete(checkpointId);
}
}
Expand Down
11 changes: 11 additions & 0 deletions flink-connector-redis/pom.xml
Expand Up @@ -74,4 +74,15 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<inherited>true</inherited>
<extensions>true</extensions>
</plugin>
</plugins>
</build>
</project>
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -90,7 +90,7 @@
<log4j.version>1.2.17</log4j.version>

<!-- Spark version -->
<flink.version>1.1.1</flink.version>
<flink.version>1.2.0</flink.version>

<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
Expand Down

0 comments on commit d1e0466

Please sign in to comment.