Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion flink-ml-parent/flink-ml-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,31 @@ under the License.
<version>1.10-SNAPSHOT</version>
</parent>

<artifactId>flink-ml-lib</artifactId>
<artifactId>flink-ml-lib_${scala.binary.version}</artifactId>
<name>flink-ml-lib</name>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.ml.common;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
* The MLEnvironment stores the necessary context in Flink.
* Each MLEnvironment will be associated with a unique ID.
* The operations associated with the same MLEnvironment ID
* will share the same Flink job context.
*
* <p>Both MLEnvironment ID and MLEnvironment can only be retrieved from MLEnvironmentFactory.
*
* @see ExecutionEnvironment
* @see StreamExecutionEnvironment
* @see BatchTableEnvironment
* @see StreamTableEnvironment
*/
public class MLEnvironment {
private ExecutionEnvironment env;
private StreamExecutionEnvironment streamEnv;
private BatchTableEnvironment batchTableEnv;
private StreamTableEnvironment streamTableEnv;

/**
* Construct with null that the class can load the environment in the `get` method.
*/
public MLEnvironment() {
this(null, null, null, null);
}

/**
* Construct with the batch environment and the the batch table environment given by user.
*
* <p>The env can be null which will be loaded in the `get` method.
*
* @param batchEnv the ExecutionEnvironment
* @param batchTableEnv the BatchTableEnvironment
*/
public MLEnvironment(
ExecutionEnvironment batchEnv,
BatchTableEnvironment batchTableEnv) {
this(batchEnv, batchTableEnv, null, null);
}

/**
* Construct with the stream environment and the the stream table environment given by user.
*
* <p>The env can be null which will be loaded in the `get` method.
*
* @param streamEnv the StreamExecutionEnvironment
* @param streamTableEnv the StreamTableEnvironment
*/
public MLEnvironment(
StreamExecutionEnvironment streamEnv,
StreamTableEnvironment streamTableEnv) {
this(null, null, streamEnv, streamTableEnv);
}

/**
* Construct with env given by user.
*
* <p>The env can be null which will be loaded in the `get` method.
*
* @param batchEnv the ExecutionEnvironment
* @param batchTableEnv the BatchTableEnvironment
* @param streamEnv the StreamExecutionEnvironment
* @param streamTableEnv the StreamTableEnvironment
*/
public MLEnvironment(
ExecutionEnvironment batchEnv,
BatchTableEnvironment batchTableEnv,
StreamExecutionEnvironment streamEnv,
StreamTableEnvironment streamTableEnv) {
this.env = batchEnv;
this.batchTableEnv = batchTableEnv;
this.streamEnv = streamEnv;
this.streamTableEnv = streamTableEnv;
}

/**
* Get the ExecutionEnvironment.
* if the ExecutionEnvironment has not been set, it initial the ExecutionEnvironment
* with default Configuration.
*
* @return the batch {@link ExecutionEnvironment}
*/
public ExecutionEnvironment getExecutionEnvironment() {
if (null == env) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, it would be good make the MLEnvironment immutable. i.e. set the values in the constructor instead of doing it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have been added the Constructor that use the environment as its parameters

env = ExecutionEnvironment.getExecutionEnvironment();
}
return env;
}

/**
* Get the StreamExecutionEnvironment.
* if the StreamExecutionEnvironment has not been set, it initial the StreamExecutionEnvironment
* with default Configuration.
*
* @return the {@link StreamExecutionEnvironment}
*/
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
if (null == streamEnv) {
streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the private instance variables in the constructor? This way all the instance variables can be final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, as we discussed offline, for depending an unfixed bug, we will use current implementation as a temp workaround.

}
return streamEnv;
}

/**
* Get the BatchTableEnvironment.
* if the BatchTableEnvironment has not been set, it initial the BatchTableEnvironment
* with default Configuration.
*
* @return the {@link BatchTableEnvironment}
*/
public BatchTableEnvironment getBatchTableEnvironment() {
if (null == batchTableEnv) {
batchTableEnv = BatchTableEnvironment.create(getExecutionEnvironment());
}
return batchTableEnv;
}

/**
* Get the StreamTableEnvironment.
* if the StreamTableEnvironment has not been set, it initial the StreamTableEnvironment
* with default Configuration.
*
* @return the {@link StreamTableEnvironment}
*/
public StreamTableEnvironment getStreamTableEnvironment() {
if (null == streamTableEnv) {
streamTableEnv = StreamTableEnvironment.create(getStreamExecutionEnvironment());
}
return streamTableEnv;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.ml.common;

import java.util.HashMap;

/**
* Factory to get the MLEnvironment using a MLEnvironmentId.
*
* <p>The following code snippet shows how to interact with MLEnvironmentFactory.
* <pre>
* {@code
* long mlEnvId = MLEnvironmentFactory.getNewMLEnvironmentId();
* MLEnvironment mlEnv = MLEnvironmentFactory.get(mlEnvId);
* }
* </pre>
*/
public class MLEnvironmentFactory {

/**
* The default MLEnvironmentId.
*/
public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;

/**
* A monotonically increasing id for the MLEnvironments.
* Each id uniquely identifies an MLEnvironment.
*/
private static Long nextId = 1L;

/**
* Map that hold the MLEnvironment and use the MLEnvironmentId as its key.
*/
private static final HashMap<Long, MLEnvironment> map = new HashMap<>();

static {
map.put(DEFAULT_ML_ENVIRONMENT_ID, new MLEnvironment());
}

/**
* Get the MLEnvironment using a MLEnvironmentId.
*
* @param mlEnvId the MLEnvironmentId
* @return the MLEnvironment
*/
public static synchronized MLEnvironment get(Long mlEnvId) {
if (!map.containsKey(mlEnvId)) {
throw new IllegalArgumentException(
String.format("Cannot find MLEnvironment for MLEnvironmentId %s." +
" Did you get the MLEnvironmentId by calling getNewMLEnvironmentId?", mlEnvId));
}

return map.get(mlEnvId);
}

/**
* Get the MLEnvironment use the default MLEnvironmentId.
*
* @return the default MLEnvironment.
*/
public static synchronized MLEnvironment getDefault() {
return get(DEFAULT_ML_ENVIRONMENT_ID);
}

/**
* Create a unique MLEnvironment id and register a new MLEnvironment in the factory.
*
* @return the MLEnvironment id.
*/
public static synchronized Long getNewMLEnvironmentId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the MLEnvironment immutable, may be we should take TableEnvironment / StreamExecutionEnvironment / ExecutionEnvironment as input parameter, then pass them to the MLEnvironment constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added

return registerMLEnvironment(new MLEnvironment());
}

/**
* Register a new MLEnvironment to the factory and return a new MLEnvironment id.
*
* @param env the MLEnvironment that will be stored in the factory.
* @return the MLEnvironment id.
*/
public static synchronized Long registerMLEnvironment(MLEnvironment env) {
map.put(nextId, env);
return nextId++;
}

/**
* Remove the MLEnvironment using the MLEnvironmentId.
*
* @param mlEnvId the id.
* @return the removed MLEnvironment
*/
public static synchronized MLEnvironment remove(Long mlEnvId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this API be called? seems like it is not necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MLEnviromentFactory can create multi MLEnviroment, if some of them only used in a time slot, user can use this API to release the resource. In most cases, user just use default MLEnviroment. This API is rarely used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one minor bug here. The default MLEnvironment should never be removed. I'll fix this when checking in the code. Thanks.

return map.remove(mlEnvId);
}
}
Loading