Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2435] User-defined types in CsvReader #5862

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -221,7 +220,7 @@ protected void setFieldTypesGeneric(Class<?> ... fieldTypes) {
Class<?> type = fieldTypes[i];

if (type != null) {
if (FieldParser.getParserForType(type) == null) {
if (FieldParser.getDefaultParserForType(type) == null && FieldParser.getCustomParserForType(type) == null) {
throw new IllegalArgumentException("The type '" + type.getName() + "' is not supported for the CSV input format.");
}
types.add(type);
Expand Down Expand Up @@ -253,7 +252,7 @@ protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes)
Class<?> type = fieldTypes[i];

if (type != null) {
if (FieldParser.getParserForType(type) == null) {
if (FieldParser.getDefaultParserForType(type) == null && FieldParser.getCustomParserForType(type) == null) {
throw new IllegalArgumentException("The type '" + type.getName()
+ "' is not supported for the CSV input format.");
}
Expand Down Expand Up @@ -285,7 +284,7 @@ protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
throw new IllegalArgumentException("Type for included field " + i + " should not be null.");
} else {
// check if we support parsers for this type
if (FieldParser.getParserForType(type) == null) {
if (FieldParser.getDefaultParserForType(type) == null && FieldParser.getCustomParserForType(type) == null) {
throw new IllegalArgumentException("The type '" + type.getName() + "' is not supported for the CSV input format.");
}
types.add(type);
Expand All @@ -310,13 +309,10 @@ public void open(FileInputSplit split) throws IOException {

for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i] != null) {
Class<? extends FieldParser<?>> parserType = FieldParser.getParserForType(fieldTypes[i]);
if (parserType == null) {
throw new RuntimeException("No parser available for type '" + fieldTypes[i].getName() + "'.");
FieldParser<?> p = FieldParser.getParserInstanceFor(fieldTypes[i]);
if ( p == null ) {
throw new IllegalArgumentException("No parser available for type '" + fieldTypes[i].getName() + "'.");
}

FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);

p.setCharset(getCharset());
if (this.quotedStringParsing) {
if (p instanceof StringParser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,37 @@ public static <X extends Tuple> TupleTypeInfo<X> getBasicTupleTypeInfo(Class<?>.
return tupleInfo;
}

/**
* Resolves a type information for each specified income type and forms an instance of a resulting {@link TupleTypeInfo} type.
* @param incomeTypes tuple fields' types
* @param <X> a resulting type of a tuple, e.g. Tuple1, Tuple2...
* @return A tuple information type, built from the specified income types.
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public static <X extends Tuple> TupleTypeInfo<X> getTupleTypeInfo(Class<?>... incomeTypes) {
if (incomeTypes == null || incomeTypes.length == 0) {
throw new IllegalArgumentException();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the good idea add here message for exception.

}

TypeInformation<?>[] infos = new TypeInformation<?>[incomeTypes.length];
for (int i = 0; i < infos.length; i++) {
Class<?> incomeType = incomeTypes[i];
if (incomeType == null) {
throw new IllegalArgumentException("Type at position " + i + " is null.");
}

TypeInformation<?> info = TypeExtractor.getForClass(incomeType);
if (!info.getTypeClass().equals(GenericTypeInfo.class)) {
infos[i] = info;
} else {
throw new IllegalArgumentException("Either a concrete type cannot be identified or this is a recursive type.");
}
}

return (TupleTypeInfo<X>) new TupleTypeInfo<>(infos);
}

@SuppressWarnings("unchecked")
@PublicEvolving
public static <X extends Tuple> TupleTypeInfo<X> getBasicAndBasicValueTupleTypeInfo(Class<?>... basicTypes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected TypeExtractor() {
// TypeInfoFactory registry
// --------------------------------------------------------------------------------------------

private static Map<Type, Class<? extends TypeInfoFactory>> registeredTypeInfoFactories = new HashMap<>();
public static Map<Type, Class<? extends TypeInfoFactory>> registeredTypeInfoFactories = new HashMap<>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what is it chages?


/**
* Registers a type information factory globally for a certain type. Every following type extraction
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.types.parser;

import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

public class DefaultParserFactory<T> implements ParserFactory<T> {

private final Class<? extends FieldParser<T>> parserType;

public DefaultParserFactory(Class<? extends FieldParser<T>> parserType) {
Preconditions.checkNotNull(parserType, "Parser class must be not null.");
this.parserType = parserType;
}

@Override
public Class<? extends FieldParser<T>> getParserType() {
return parserType;
}

@Override
public FieldParser<T> create() {
return InstantiationUtil.instantiate(parserType, FieldParser.class);
}
}
160 changes: 113 additions & 47 deletions flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -47,7 +50,9 @@
*/
@PublicEvolving
public abstract class FieldParser<T> {


private static final Logger LOG = LoggerFactory.getLogger(FieldParser.class);

/**
* An enumeration of different types of errors that may occur.
*/
Expand Down Expand Up @@ -89,14 +94,14 @@ public static enum ParseErrorState {
* the state of this parser.
* The start position within the byte array and the array's valid length is given.
* The content of the value is delimited by a field delimiter.
*
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you delete excess changes

*
* @param bytes The byte array that holds the value.
* @param startPos The index where the field starts
* @param limit The limit unto which the byte contents is valid for the parser. The limit is the
* position one after the last valid byte.
* @param delim The field delimiter character
* @param reuse An optional reusable field to hold the value
*
*
* @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise.
*/
public int resetErrorStateAndParse(byte[] bytes, int startPos, int limit, byte[] delim, T reuse) {
Expand All @@ -122,27 +127,27 @@ protected void resetParserState() {
* Gets the parsed field. This method returns the value parsed by the last successful invocation of
* {@link #parseField(byte[], int, int, byte[], Object)}. It objects are mutable and reused, it will return
* the object instance that was passed the parse function.
*
*
* @return The latest parsed field.
*/
public abstract T getLastResult();

/**
* Returns an instance of the parsed value type.
*
*
* @return An instance of the parsed value type.
*/
public abstract T createValue();

/**
* Checks if the delimiter starts at the given start position of the byte array.
*
*
* Attention: This method assumes that enough characters follow the start position for the delimiter check!
*
*
* @param bytes The byte array that holds the value.
* @param startPos The index of the byte array where the check for the delimiter starts.
* @param delim The delimiter to check for.
*
*
* @return true if a delimiter starts at the given start position, false otherwise.
*/
public static final boolean delimiterNext(byte[] bytes, int startPos, byte[] delim) {
Expand All @@ -154,7 +159,7 @@ public static final boolean delimiterNext(byte[] bytes, int startPos, byte[] del
}
}
return true;

}

/**
Expand All @@ -177,21 +182,21 @@ public static final boolean endsWithDelimiter(byte[] bytes, int endPos, byte[] d
}
return true;
}

/**
* Sets the error state of the parser. Called by subclasses of the parser to set the type of error
* when failing a parse.
*
*
* @param error The error state to set.
*/
protected void setErrorState(ParseErrorState error) {
this.errorState = error;
}

/**
* Gets the error state of the parser, as a value of the enumeration {@link ParseErrorState}.
* If no error occurred, the error state will be {@link ParseErrorState#NONE}.
*
*
* @return The current error state of the parser.
*/
public ParseErrorState getErrorState() {
Expand Down Expand Up @@ -262,54 +267,115 @@ public void setCharset(Charset charset) {
// --------------------------------------------------------------------------------------------
// Mapping from types to parsers
// --------------------------------------------------------------------------------------------

/**
* Gets the parser for the type specified by the given class. Returns null, if no parser for that class
* Provides an instance of {@link FieldParser} that corresponds to the specified type.
* @param type a field type for which a {@link FieldParser} is needed.
* @return if there is a custom parser for the specified field type - it is returned; then, if there is a default parser
* responsible for the specified type - it is returned; otherwise, {@code null} is returned.
*/
@SuppressWarnings("unchecked")
public static <T> FieldParser<T> getParserInstanceFor(Class<T> type) {
ParserFactory<T> parserFactory = (ParserFactory<T>) CUSTOM_PARSERS.get(type);
if (parserFactory == null) {
parserFactory = (ParserFactory<T>) DEFAULT_PARSERS.get(type);
}

if (parserFactory == null) {
return null;
}

return parserFactory.create();
}

/**
* Gets the default parser for the type specified by the given class. Returns null, if no parser for that class
* is known.
*
*
* @param type The class of the type to get the parser for.
* @return The parser for the given type, or null, if no such parser exists.
*/
public static <T> Class<FieldParser<T>> getParserForType(Class<T> type) {
Class<? extends FieldParser<?>> parser = PARSERS.get(type);
if (parser == null) {
public static <T> Class<FieldParser<T>> getDefaultParserForType(Class<T> type) {
ParserFactory<?> parserFactory = DEFAULT_PARSERS.get(type);
if (parserFactory == null) {
return null;
} else {
@SuppressWarnings("unchecked")
Class<FieldParser<T>> typedParser = (Class<FieldParser<T>>) parser;
Class<FieldParser<T>> typedParser = (Class<FieldParser<T>>) parserFactory.getParserType();
return typedParser;
}
}

private static final Map<Class<?>, Class<? extends FieldParser<?>>> PARSERS =
new HashMap<Class<?>, Class<? extends FieldParser<?>>>();


/**
* Gets the custom parser for the type specified by the given class. Returns null, if no parser for that class
* is known.
*
* @param type The class of the type to get the parser for.
* @return The parser for the given type, or null, if no such parser exists.
*/
public static <T> Class<? extends FieldParser<T>> getCustomParserForType(Class<T> type) {
synchronized (CUSTOM_PARSERS) {
ParserFactory<T> parserFactory = (ParserFactory<T>) CUSTOM_PARSERS.get(type);
if (parserFactory == null) {
return null;
} else {
return parserFactory.getParserType();
}
}
}

private static final Map<Class<?>, ParserFactory<?>> DEFAULT_PARSERS = new HashMap<>();

static {
// basic types
PARSERS.put(Byte.class, ByteParser.class);
PARSERS.put(Short.class, ShortParser.class);
PARSERS.put(Integer.class, IntParser.class);
PARSERS.put(Long.class, LongParser.class);
PARSERS.put(String.class, StringParser.class);
PARSERS.put(Float.class, FloatParser.class);
PARSERS.put(Double.class, DoubleParser.class);
PARSERS.put(Boolean.class, BooleanParser.class);
PARSERS.put(BigDecimal.class, BigDecParser.class);
PARSERS.put(BigInteger.class, BigIntParser.class);
DEFAULT_PARSERS.put(Byte.class, new DefaultParserFactory<>(ByteParser.class));
DEFAULT_PARSERS.put(Short.class, new DefaultParserFactory<>(ShortParser.class));
DEFAULT_PARSERS.put(Integer.class, new DefaultParserFactory<>(IntParser.class));
DEFAULT_PARSERS.put(Long.class, new DefaultParserFactory<>(LongParser.class));
DEFAULT_PARSERS.put(String.class, new DefaultParserFactory<>(StringParser.class));
DEFAULT_PARSERS.put(Float.class, new DefaultParserFactory<>(FloatParser.class));
DEFAULT_PARSERS.put(Double.class, new DefaultParserFactory<>(DoubleParser.class));
DEFAULT_PARSERS.put(Boolean.class, new DefaultParserFactory<>(BooleanParser.class));
DEFAULT_PARSERS.put(BigDecimal.class, new DefaultParserFactory<>(BigDecParser.class));
DEFAULT_PARSERS.put(BigInteger.class, new DefaultParserFactory<>(BigIntParser.class));

// value types
PARSERS.put(ByteValue.class, ByteValueParser.class);
PARSERS.put(ShortValue.class, ShortValueParser.class);
PARSERS.put(IntValue.class, IntValueParser.class);
PARSERS.put(LongValue.class, LongValueParser.class);
PARSERS.put(StringValue.class, StringValueParser.class);
PARSERS.put(FloatValue.class, FloatValueParser.class);
PARSERS.put(DoubleValue.class, DoubleValueParser.class);
PARSERS.put(BooleanValue.class, BooleanValueParser.class);
DEFAULT_PARSERS.put(ByteValue.class, new DefaultParserFactory<>(ByteValueParser.class));
DEFAULT_PARSERS.put(ShortValue.class, new DefaultParserFactory<>(ShortValueParser.class));
DEFAULT_PARSERS.put(IntValue.class, new DefaultParserFactory<>(IntValueParser.class));
DEFAULT_PARSERS.put(LongValue.class, new DefaultParserFactory<>(LongValueParser.class));
DEFAULT_PARSERS.put(StringValue.class, new DefaultParserFactory<>(StringValueParser.class));
DEFAULT_PARSERS.put(FloatValue.class, new DefaultParserFactory<>(FloatValueParser.class));
DEFAULT_PARSERS.put(DoubleValue.class, new DefaultParserFactory<>(DoubleValueParser.class));
DEFAULT_PARSERS.put(BooleanValue.class, new DefaultParserFactory<>(BooleanValueParser.class));

// SQL date/time types
PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
PARSERS.put(java.sql.Date.class, SqlDateParser.class);
PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
DEFAULT_PARSERS.put(java.sql.Time.class, new DefaultParserFactory<>(SqlTimeParser.class));
DEFAULT_PARSERS.put(java.sql.Date.class, new DefaultParserFactory<>(SqlDateParser.class));
DEFAULT_PARSERS.put(java.sql.Timestamp.class, new DefaultParserFactory<>(SqlTimestampParser.class));
}

private static final Map<Class<?>, ParserFactory<?>> CUSTOM_PARSERS = new HashMap<>();

/**
* Registers a user-defined (custom) type with a parser factory for it.
* Custom type parsing precedes default one.
* @param type a user-defined type
* @param factory the type's parser factory.
* @return the registration status: 1 - registration is successful, -1 - otherwise.
*/
public static <T> int registerCustomParser(Class<T> type, ParserFactory<T> factory) {
Preconditions.checkNotNull(type, "The type must be not null.");
Preconditions.checkNotNull(factory, "The factory must be not null.");

synchronized (CUSTOM_PARSERS) {
if (CUSTOM_PARSERS.containsKey(type)) {
LOG.warn("'{}' type is already registered with '{}' parser. Skipping.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't provide arguments for format message to log.

return -1;
}
CUSTOM_PARSERS.put(type, factory);
return 1;
}
}

}