Skip to content

Commit

Permalink
[FLINK-2692] Untangle CsvInputFormat
Browse files Browse the repository at this point in the history
This closes #1266
  • Loading branch information
zentol committed Nov 18, 2015
1 parent fc6fec7 commit bd61f2d
Show file tree
Hide file tree
Showing 18 changed files with 573 additions and 599 deletions.
Expand Up @@ -20,6 +20,7 @@
import backtype.storm.topology.IRichBolt;

import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -119,7 +120,7 @@ private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvir
// read the text file from given input path
PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo<Sentence>) TypeExtractor
.getForObject(new Sentence(""));
return env.createInput(new CsvInputFormat<Sentence>(new Path(
return env.createInput(new PojoCsvInputFormat<Sentence>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
sourceType);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import backtype.storm.tuple.Fields;

import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -122,7 +123,7 @@ private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutio
// read the text file from given input path
TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
.getForObject(new Tuple1<String>(""));
return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
return env.createInput(new TupleCsvInputFormat<Tuple1<String>>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
sourceType);
Expand Down

This file was deleted.

0 comments on commit bd61f2d

Please sign in to comment.