This repository has been archived by the owner on Feb 26, 2024. It is now read-only.
/
CSVInputFormat.java
221 lines (198 loc) · 9.13 KB
/
CSVInputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.text.csv;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.google.common.annotations.VisibleForTesting;
/**
* A {@link FileInputFormat} for use specifically with CSV files. This input
* format deals with the fact that CSV files can potentially have multiple lines
* within fields which should all be treated as one record.
*/
public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implements Configurable {
@VisibleForTesting
protected int bufferSize;
@VisibleForTesting
protected String inputFileEncoding;
@VisibleForTesting
protected char openQuoteChar;
@VisibleForTesting
protected char closeQuoteChar;
@VisibleForTesting
protected char escapeChar;
@VisibleForTesting
protected int maximumRecordSize;
private Configuration configuration;
/**
* This method is used by crunch to get an instance of {@link CSVRecordReader}
*
* @param split
* the {@link InputSplit} that will be assigned to the record reader
* @param context
* the {@TaskAttemptContext} for the job
* @return an instance of {@link CSVRecordReader} created using configured
* separator, quote, escape, and maximum record size.
*/
@Override
public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split, final TaskAttemptContext context) {
return new CSVRecordReader(this.bufferSize, this.inputFileEncoding, this.openQuoteChar, this.closeQuoteChar,
this.escapeChar, this.maximumRecordSize);
}
/**
* A method used by crunch to calculate the splits for each file. This will
* split each CSV file at the end of a valid CSV record. The default split
* size is 64mb, but this can be reconfigured by setting the
* "csv.inputsplitsize" option in the job configuration.
*
* @param job
* the {@link JobContext} for the current job.
* @return a List containing all of the calculated splits for a single file.
* @throws IOException
* if an error occurs while accessing HDFS
*/
@Override
public List<InputSplit> getSplits(final JobContext job) throws IOException {
final long splitSize = job.getConfiguration().getLong(CSVFileSource.INPUT_SPLIT_SIZE, 67108864);
final List<InputSplit> splits = new ArrayList<InputSplit>();
final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0]));
FSDataInputStream inputStream = null;
Configuration config = job.getConfiguration();
CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(config);
try {
for (final Path path : paths) {
FileSystem fileSystem = path.getFileSystem(config);
CompressionCodec codec = compressionCodecFactory.getCodec(path);
if(codec == null) {
//if file is not compressed then split it up.
inputStream = fileSystem.open(path);
splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream));
}else{
//compressed file so no splitting it
splits.add(new FileSplit(path,0, Long.MAX_VALUE, new String[0]));
}
}
return splits;
} finally {
if (inputStream != null) {
inputStream.close();
}
}
}
@Override
public Configuration getConf() {
return configuration;
}
@Override
public void setConf(final Configuration conf) {
configuration = conf;
configure();
}
/**
* This method will read the configuration options that were set in
* {@link CSVFileSource}'s private getBundle() method
*/
public void configure() {
inputFileEncoding = this.configuration.get(CSVFileSource.CSV_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING);
maximumRecordSize = this.configuration.getInt(CSVFileSource.MAXIMUM_RECORD_SIZE, this.configuration.getInt(CSVFileSource.INPUT_SPLIT_SIZE, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE));
closeQuoteChar = this.configuration.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR, String.valueOf(CSVLineReader.DEFAULT_QUOTE_CHARACTER)).charAt(0);
openQuoteChar = this.configuration.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR, String.valueOf(CSVLineReader.DEFAULT_QUOTE_CHARACTER)).charAt(0);
escapeChar = this.configuration.get(CSVFileSource.CSV_ESCAPE_CHAR, String.valueOf(CSVLineReader.DEFAULT_ESCAPE_CHARACTER)).charAt(0);
bufferSize = this.configuration.getInt(CSVFileSource.CSV_BUFFER_SIZE, CSVLineReader.DEFAULT_BUFFER_SIZE);
}
/**
* In summary, this method will start at the beginning of the file, seek to
* the position corresponding to the desired split size, seek to the end of
* the line that contains that position, then attempt to seek until the
* CSVLineReader indicates that the current position is no longer within a CSV
* record. Then, it will mark that position for a split and a repeat its
* logic.
*/
@VisibleForTesting
protected List<FileSplit> getSplitsForFile(final long splitSize, final long fileSize, final Path fileName,
final FSDataInputStream inputStream) throws IOException {
final List<FileSplit> splitsList = new ArrayList<FileSplit>();
long splitStart;
long currentPosition = 0;
boolean endOfFile = false;
while (!endOfFile) {
// Set the start of this split to the furthest read point in the file
splitStart = currentPosition;
// Skip a number of bytes equal to the desired split size to avoid parsing
// every csv line, which greatly increases the run time
currentPosition = splitStart + splitSize;
// The input stream will freak out if we try to seek past the EOF
if (currentPosition >= fileSize) {
currentPosition = fileSize;
endOfFile = true;
final FileSplit fileSplit = new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[]{});
splitsList.add(fileSplit);
break;
}
// Every time we seek to the new approximate split point,
// we need to create a new CSVLineReader around the stream.
inputStream.seek(currentPosition);
final CSVLineReader csvLineReader = new CSVLineReader(inputStream, this.bufferSize, this.inputFileEncoding,
this.openQuoteChar, this.closeQuoteChar, this.escapeChar, this.maximumRecordSize);
// This line is potentially garbage because we most likely just sought to
// the middle of a line. Read the rest of the line and leave it for the
// previous split. Then reset the multi-line CSV record boolean, because
// the partial line will have a very high chance of falsely triggering the
// class-wide multi-line logic.
currentPosition += csvLineReader.readFileLine(new Text());
csvLineReader.resetMultiLine();
// Now, we may still be in the middle of a multi-line CSV record.
currentPosition += csvLineReader.readFileLine(new Text());
// If we are, read until we are not.
while (csvLineReader.isInMultiLine()) {
final int bytesRead = csvLineReader.readFileLine(new Text());
// End of file
if (bytesRead <= 0) {
break;
}
currentPosition += bytesRead;
}
// We're out of the multi-line CSV record, so it's safe to end the
// previous split.
splitsList.add(new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[]{}));
}
return splitsList;
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}