Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<artifactId>flink-python</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions flink-dist/src/main/assemblies/opt.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@

<!-- Python -->
<file>
<source>../flink-python/target/flink-python_${scala.binary.version}-${project.version}.jar</source>
<source>../flink-python/target/flink-python-${project.version}.jar</source>
<outputDirectory>opt</outputDirectory>
<destName>flink-python_${scala.binary.version}-${project.version}.jar</destName>
<destName>flink-python-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>

Expand Down
2 changes: 1 addition & 1 deletion flink-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<artifactId>flink-python</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/flink-python-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<artifactId>flink-python</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion flink-python/apache-flink-libraries/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def find_file_path(pattern):
LIB_PATH = os.path.join(FLINK_HOME, "lib")
OPT_PATH = os.path.join(FLINK_HOME, "opt")
OPT_PYTHON_JAR_NAME = os.path.basename(
find_file_path(os.path.join(OPT_PATH, "flink-python_*.jar")))
find_file_path(os.path.join(OPT_PATH, "flink-python*.jar")))
OPT_SQL_CLIENT_JAR_NAME = os.path.basename(
find_file_path(os.path.join(OPT_PATH, "flink-sql-client*.jar")))
LICENSES_PATH = os.path.join(FLINK_HOME, "licenses")
Expand Down
26 changes: 1 addition & 25 deletions flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ under the License.
<relativePath>..</relativePath>
</parent>

<artifactId>flink-python_${scala.binary.version}</artifactId>
<artifactId>flink-python</artifactId>
<name>Flink : Python</name>

<packaging>jar</packaging>
Expand Down Expand Up @@ -72,12 +72,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
Expand Down Expand Up @@ -470,24 +464,6 @@ under the License.
</execution>
</executions>
</plugin>
<!-- This is only a temporary solution until FLINK-22872 is fixed. -->
<!-- It compiles `org.apache.flink.table.legacyutils` containing code from the old planner. -->
<!-- We should not start adding more Scala code. Please remove this as soon as possible. -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<!-- Run Scala compiler in the process-test-resources phase, so that dependencies on
Scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4 changes: 2 additions & 2 deletions flink-python/pyflink/table/tests/test_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def test_timestamps_from_extractor(self):
'rowtime.timestamps.class':
'org.apache.flink.table.legacyutils.CustomExtractor',
'rowtime.timestamps.serialized':
'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvctj'
'ZLTGK9XvxAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvcl4'
'ozwVLIwG6AgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
'50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd'
'AACdHM'}
self.assertEqual(expected, properties)
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/testing/test_case_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def assert_equals(cls, actual, expected):
@classmethod
def to_py_list(cls, actual):
py_list = []
for i in range(0, actual.length()):
py_list.append(actual.apply(i))
for i in range(0, actual.size()):
py_list.append(actual.get(i))
return py_list

@classmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.legacyutils;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.functions.AggregateFunction;

/** {@link AggregateFunction} for {@link Byte}. */
public class ByteMaxAggFunction extends AggregateFunction<Byte, MaxAccumulator<Byte>> {

private static final long serialVersionUID = 1233840393767061909L;

@Override
public MaxAccumulator<Byte> createAccumulator() {
final MaxAccumulator<Byte> acc = new MaxAccumulator<>();
resetAccumulator(acc);
return acc;
}

public void accumulate(MaxAccumulator<Byte> acc, Byte value) {
if (value != null) {
if (!acc.f1 || Byte.compare(acc.f0, value) < 0) {
acc.f0 = value;
acc.f1 = true;
}
}
}

@Override
public Byte getValue(MaxAccumulator<Byte> acc) {
if (acc.f1) {
return acc.f0;
} else {
return null;
}
}

public void merge(MaxAccumulator<Byte> acc, Iterable<MaxAccumulator<Byte>> its) {
its.forEach(
a -> {
if (a.f1) {
accumulate(acc, a.f0);
}
});
}

public void resetAccumulator(MaxAccumulator<Byte> acc) {
acc.f0 = 0;
acc.f1 = false;
}

@Override
public TypeInformation<MaxAccumulator<Byte>> getAccumulatorType() {
return new TupleTypeInfo(
MaxAccumulator.class,
BasicTypeInfo.BYTE_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.legacyutils;

import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
import org.apache.flink.types.Row;

/** A watermark assigner that throws an exception if a watermark is requested. */
public class CustomAssigner extends PunctuatedWatermarkAssigner {
private static final long serialVersionUID = -4900176786361416000L;

@Override
public Watermark getWatermark(Row row, long timestamp) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.legacyutils;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedFieldReference;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.Preconditions;

/** A timestamp extractor that looks for the SQL_TIMESTAMP "ts" field. */
public class CustomExtractor extends TimestampExtractor {
private static final long serialVersionUID = 6784900460276023738L;

private final String field = "ts";

@Override
public String[] getArgumentFields() {
return new String[] {field};
}

@Override
public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) {
throw new ValidationException(
String.format(
"Field 'ts' must be of type Timestamp but is of type %s.",
argumentFieldTypes[0]));
}
}

@Override
public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
ResolvedFieldReference fieldAccess = fieldAccesses[0];
Preconditions.checkArgument(fieldAccess.resultType() == Types.SQL_TIMESTAMP);
FieldReferenceExpression fieldReferenceExpr =
new FieldReferenceExpression(
fieldAccess.name(),
TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()),
0,
fieldAccess.fieldIndex());
return ApiExpressionUtils.unresolvedCall(
BuiltInFunctionDefinitions.CAST,
fieldReferenceExpr,
ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.legacyutils;

import org.apache.flink.api.java.tuple.Tuple2;

/** Utility class to make working with tuples more readable. */
public class MaxAccumulator<T> extends Tuple2<T, Boolean> {
private static final long serialVersionUID = 6089142148200600733L;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.legacyutils;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

import org.junit.Assert;

/**
* Testing scalar function to verify that lifecycle methods are called in the expected order and
* only once.
*/
public class RichFunc0 extends ScalarFunction {
private static final long serialVersionUID = 931156471687322386L;

private boolean openCalled = false;
private boolean closeCalled = false;

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
if (openCalled) {
Assert.fail("Open called more than once.");
} else {
openCalled = true;
}
if (closeCalled) {
Assert.fail("Close called before open.");
}
}

public void eval(int index) {
if (!openCalled) {
Assert.fail("Open was not called before eval.");
}
if (closeCalled) {
Assert.fail("Close called before eval.");
}
}

@Override
public void close() throws Exception {
super.close();
if (closeCalled) {
Assert.fail("Close called more than once.");
} else {
closeCalled = true;
}
if (!openCalled) {
Assert.fail("Open was not called before close.");
}
}
}
Loading