Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.schema;

import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.utils.Predicates;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

/** Selectors for filtering tables. */
public class Selectors {

private List<Selector> selectors;

private Selectors() {}

/**
* A {@link Selector} that determines whether a table identified by a given {@link TableId} is
* to be included.
*/
private static class Selector {
private final Predicate<String> namespacePred;
private final Predicate<String> schemaNamePred;
private final Predicate<String> tableNamePred;

public Selector(String namespace, String schemaName, String tableName) {
this.namespacePred =
namespace == null ? (namespacePred) -> false : Predicates.includes(namespace);
this.schemaNamePred =
schemaName == null
? (schemaNamePred) -> false
: Predicates.includes(schemaName);
this.tableNamePred =
tableName == null ? (tableNamePred) -> false : Predicates.includes(tableName);
}

public boolean isMatch(TableId tableId) {

String namespace = tableId.getNamespace();
String schemaName = tableId.getSchemaName();

if (namespace == null || namespace.isEmpty()) {
if (schemaName == null || schemaName.isEmpty()) {
return tableNamePred.test(tableId.getTableName());
}
return schemaNamePred.test(tableId.getSchemaName())
&& tableNamePred.test(tableId.getTableName());
}
return namespacePred.test(tableId.getNamespace())
&& schemaNamePred.test(tableId.getSchemaName())
&& tableNamePred.test(tableId.getTableName());
}
}

/** Match the {@link TableId} against the {@link Selector}s. * */
public boolean isMatch(TableId tableId) {
for (Selector selector : selectors) {
if (selector.isMatch(tableId)) {
return true;
}
}
return false;
}

/** Builder for {@link Selectors}. */
public static class SelectorsBuilder {

private List<Selector> selectors;

public SelectorsBuilder includeTables(String tableInclusions) {

if (tableInclusions == null || tableInclusions.isEmpty()) {
throw new IllegalArgumentException(
"Invalid table inclusion pattern cannot be null or empty");
}

List<Selector> selectors = new ArrayList<>();
Set<String> tableSplitSet =
Predicates.setOf(
tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str);
for (String tableSplit : tableSplitSet) {
Set<String> tableIdSet =
Predicates.setOf(
tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str);
Iterator<String> iterator = tableIdSet.iterator();
if (tableIdSet.size() == 1) {
selectors.add(new Selector(null, null, iterator.next()));
} else if (tableIdSet.size() == 2) {
selectors.add(new Selector(null, iterator.next(), iterator.next()));
} else if (tableIdSet.size() == 3) {
selectors.add(new Selector(iterator.next(), iterator.next(), iterator.next()));
} else {
throw new IllegalArgumentException(
"Invalid table inclusion pattern: " + tableInclusions);
}
}
this.selectors = selectors;
return this;
}

public Selectors build() {
Selectors selectors = new Selectors();
selectors.selectors = this.selectors;
return selectors;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.schema;

import com.ververica.cdc.common.event.TableId;

import java.util.function.Predicate;

/** A filter for tables. */
@FunctionalInterface
public interface TableFilter {

/** Determines whether the given table should be included. */
boolean isIncluded(TableId tableId);

/** Creates a {@link TableFilter} from the given predicate. */
static TableFilter fromPredicate(Predicate<TableId> predicate) {
return predicate::test;
}

/** Creates a {@link TableFilter} that includes all tables. */
static TableFilter includeAll() {
return t -> true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.text;

/** An exception representing a problem during parsing of text. */
public class ParsingException extends RuntimeException {

private static final long serialVersionUID = 1L;

private final Position position;

/** @param position the position of the error; never null */
public ParsingException(Position position) {
super();
this.position = position;
}

/**
* @param position the position of the error; never null
* @param message the message
* @param cause the underlying cause
*/
public ParsingException(Position position, String message, Throwable cause) {
super(message, cause);
this.position = position;
}

/**
* @param position the position of the error; never null
* @param message the message
*/
public ParsingException(Position position, String message) {
super(message);
this.position = position;
}

public Position getPosition() {
return position;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.text;

/**
* A class that represents the position of a particular character in terms of the lines and columns
* of a character sequence.
*/
public final class Position {

/** The position is used when there is no content. */
public static final Position EMPTY_CONTENT_POSITION = new Position(-1, 1, 0);

private final int line;
private final int column;
private final int indexInContent;

public Position(int indexInContent, int line, int column) {
this.indexInContent = indexInContent < 0 ? -1 : indexInContent;
this.line = line;
this.column = column;

assert this.line > 0;
assert this.column >= 0;
// make sure that negative index means an EMPTY_CONTENT_POSITION
assert this.indexInContent >= 0 || this.line == 1 && this.column == 0;
}

/**
* Get the 0-based index of this position in the content character array.
*
* @return the index; never negative except for the first position in an empty content.
*/
public int index() {
return indexInContent;
}

/**
* Get the 1-based column number of the character.
*
* @return the column number; always positive
*/
public int column() {
return column;
}

/**
* Get the 1-based line number of the character.
*
* @return the line number; always positive
*/
public int line() {
return line;
}

@Override
public boolean equals(Object obj) {
return super.equals(obj);
}

@Override
public int hashCode() {
return indexInContent;
}

@Override
public String toString() {
return "" + indexInContent + ':' + line + ':' + column;
}

/**
* Return a new position that is the addition of this position and that supplied.
*
* @param position the position to add to this object; may not be null
* @return the combined position
*/
public Position add(Position position) {
if (this.index() < 0) {
return position.index() < 0 ? EMPTY_CONTENT_POSITION : position;
}

if (position.index() < 0) {
return this;
}

int index = this.index() + position.index();
int line = position.line() + this.line() - 1;
int column = this.line() == 1 ? this.column() + position.column() : this.column();

return new Position(index, line, column);
}
}
Loading