Skip to content

Commit b02b6e1

Browse files
author
Prashanth Govindarajan
authored
Improve csv parsing (#5711)
* Part 2 of TextFieldParser. Next up is hooking up ReadCsv to use TextFieldParser * Make LoadCsv use TextFieldParser * More unit tests * cleanup * Address feedback * Last bit of feedback * Remove extra var * Remove duplicate file * Rename strings.resx to Strings.resx * rename the designer.cs file too
1 parent 2d3fd3d commit b02b6e1

File tree

7 files changed

+1156
-187
lines changed

7 files changed

+1156
-187
lines changed

src/Microsoft.Data.Analysis/DataFrame.IO.cs

Lines changed: 98 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private static DataFrameColumn CreateColumn(Type kind, string[] columnNames, int
172172
return ret;
173173
}
174174

175-
private static DataFrame ReadCsvLinesIntoDataFrame(IEnumerable<string> lines,
175+
private static DataFrame ReadCsvLinesIntoDataFrame(WrappedStreamReaderOrStringReader wrappedReader,
176176
char separator = ',', bool header = true,
177177
string[] columnNames = null, Type[] dataTypes = null,
178178
long numberOfRowsToRead = -1, int guessRows = 10, bool addIndexColumn = false
@@ -183,140 +183,139 @@ private static DataFrame ReadCsvLinesIntoDataFrame(IEnumerable<string> lines,
183183
throw new ArgumentException(string.Format(Strings.ExpectedEitherGuessRowsOrDataTypes, nameof(guessRows), nameof(dataTypes)));
184184
}
185185

186-
var linesForGuessType = new List<string[]>();
187-
long rowline = 0;
188-
int numberOfColumns = dataTypes?.Length ?? 0;
189-
190-
if (header == true && numberOfRowsToRead != -1)
186+
List<DataFrameColumn> columns;
187+
string[] fields;
188+
using (var textReader = wrappedReader.GetTextReader())
191189
{
192-
numberOfRowsToRead++;
193-
}
190+
TextFieldParser parser = new TextFieldParser(textReader);
191+
parser.SetDelimiters(separator.ToString());
194192

195-
List<DataFrameColumn> columns;
196-
// First pass: schema and number of rows.
197-
string line = null;
193+
var linesForGuessType = new List<string[]>();
194+
long rowline = 0;
195+
int numberOfColumns = dataTypes?.Length ?? 0;
198196

199-
var enumerator = lines.GetEnumerator();
200-
while (enumerator.MoveNext())
201-
{
202-
line = enumerator.Current;
203-
if ((numberOfRowsToRead == -1) || rowline < numberOfRowsToRead)
197+
if (header == true && numberOfRowsToRead != -1)
198+
{
199+
numberOfRowsToRead++;
200+
}
201+
202+
// First pass: schema and number of rows.
203+
while ((fields = parser.ReadFields()) != null)
204204
{
205-
if (linesForGuessType.Count < guessRows || (header && rowline == 0))
205+
if ((numberOfRowsToRead == -1) || rowline < numberOfRowsToRead)
206206
{
207-
var spl = line.Split(separator);
208-
if (header && rowline == 0)
207+
if (linesForGuessType.Count < guessRows || (header && rowline == 0))
209208
{
210-
if (columnNames == null)
209+
if (header && rowline == 0)
211210
{
212-
columnNames = spl;
211+
if (columnNames == null)
212+
{
213+
columnNames = fields;
214+
}
215+
}
216+
else
217+
{
218+
linesForGuessType.Add(fields);
219+
numberOfColumns = Math.Max(numberOfColumns, fields.Length);
213220
}
214221
}
215-
else
216-
{
217-
linesForGuessType.Add(spl);
218-
numberOfColumns = Math.Max(numberOfColumns, spl.Length);
219-
}
222+
}
223+
++rowline;
224+
if (rowline == guessRows || guessRows == 0)
225+
{
226+
break;
220227
}
221228
}
222-
++rowline;
223-
if (rowline == guessRows || guessRows == 0)
229+
230+
if (rowline == 0)
224231
{
225-
break;
232+
throw new FormatException(Strings.EmptyFile);
226233
}
227-
}
228234

229-
if (rowline == 0)
230-
{
231-
throw new FormatException(Strings.EmptyFile);
232-
}
233-
234-
columns = new List<DataFrameColumn>(numberOfColumns);
235-
// Guesses types or looks up dataTypes and adds columns.
236-
for (int i = 0; i < numberOfColumns; ++i)
237-
{
238-
Type kind = dataTypes == null ? GuessKind(i, linesForGuessType) : dataTypes[i];
239-
columns.Add(CreateColumn(kind, columnNames, i));
235+
columns = new List<DataFrameColumn>(numberOfColumns);
236+
// Guesses types or looks up dataTypes and adds columns.
237+
for (int i = 0; i < numberOfColumns; ++i)
238+
{
239+
Type kind = dataTypes == null ? GuessKind(i, linesForGuessType) : dataTypes[i];
240+
columns.Add(CreateColumn(kind, columnNames, i));
241+
}
240242
}
241243

242244
DataFrame ret = new DataFrame(columns);
243-
line = null;
244245

245246
// Fill values.
246-
enumerator.Reset();
247-
rowline = 0;
248-
while (enumerator.MoveNext() && (numberOfRowsToRead == -1 || rowline < numberOfRowsToRead))
247+
using (var textReader = wrappedReader.GetTextReader())
249248
{
250-
line = enumerator.Current;
251-
var spl = line.Split(separator);
252-
if (header && rowline == 0)
253-
{
254-
// Skips.
255-
}
256-
else
249+
TextFieldParser parser = new TextFieldParser(textReader);
250+
parser.SetDelimiters(separator.ToString());
251+
252+
long rowline = 0;
253+
while ((fields = parser.ReadFields()) != null && (numberOfRowsToRead == -1 || rowline < numberOfRowsToRead))
257254
{
258-
ret.Append(spl, inPlace: true);
255+
if (header && rowline == 0)
256+
{
257+
// Skips.
258+
}
259+
else
260+
{
261+
ret.Append(fields, inPlace: true);
262+
}
263+
++rowline;
259264
}
260-
++rowline;
261-
}
262265

263-
if (addIndexColumn)
264-
{
265-
PrimitiveDataFrameColumn<int> indexColumn = new PrimitiveDataFrameColumn<int>("IndexColumn", columns[0].Length);
266-
for (int i = 0; i < columns[0].Length; i++)
266+
if (addIndexColumn)
267267
{
268-
indexColumn[i] = i;
268+
PrimitiveDataFrameColumn<int> indexColumn = new PrimitiveDataFrameColumn<int>("IndexColumn", columns[0].Length);
269+
for (int i = 0; i < columns[0].Length; i++)
270+
{
271+
indexColumn[i] = i;
272+
}
273+
columns.Insert(0, indexColumn);
269274
}
270-
columns.Insert(0, indexColumn);
271-
}
272-
return ret;
273-
}
274275

275-
private class CsvLines : IEnumerable<string>
276-
{
277-
private CsvLineEnumerator enumerator;
278-
public CsvLines(CsvLineEnumerator csvLineEnumerator)
279-
{
280-
enumerator = csvLineEnumerator;
281276
}
282277

283-
public IEnumerator<string> GetEnumerator() => enumerator;
284-
285-
IEnumerator IEnumerable.GetEnumerator() => enumerator;
278+
return ret;
286279
}
287280

288-
private class CsvLineEnumerator : IEnumerator<string>
281+
private class WrappedStreamReaderOrStringReader
289282
{
290-
private StreamReader streamReader;
291-
private string currentLine;
292-
private long streamStartPosition;
293-
public CsvLineEnumerator(StreamReader csvStream)
294-
{
295-
streamStartPosition = csvStream.BaseStream.Position;
296-
streamReader = csvStream;
297-
currentLine = null;
298-
}
299-
300-
public string Current => currentLine;
301-
302-
object IEnumerator.Current => currentLine;
283+
private Stream _stream;
284+
private long _initialPosition;
285+
private Encoding _encoding;
286+
private string _csvString;
303287

304-
public void Dispose()
288+
public WrappedStreamReaderOrStringReader(Stream stream, Encoding encoding)
305289
{
306-
throw new NotImplementedException();
290+
_stream = stream;
291+
_initialPosition = stream.Position;
292+
_encoding = encoding;
293+
_csvString = null;
307294
}
308295

309-
public bool MoveNext()
296+
public WrappedStreamReaderOrStringReader(string csvString)
310297
{
311-
currentLine = streamReader.ReadLine();
312-
return currentLine != null;
298+
_csvString = csvString;
299+
_initialPosition = 0;
300+
_encoding = null;
301+
_stream = null;
313302
}
314303

315-
public void Reset()
304+
// Returns a new TextReader. If the wrapped object is a stream, the stream is reset to its initial position.
305+
public TextReader GetTextReader()
316306
{
317-
streamReader.DiscardBufferedData();
318-
streamReader.BaseStream.Seek(streamStartPosition, SeekOrigin.Begin);
307+
if (_stream != null)
308+
{
309+
_stream.Seek(_initialPosition, SeekOrigin.Begin);
310+
return new StreamReader(_stream, _encoding, detectEncodingFromByteOrderMarks: true, DefaultStreamReaderBufferSize, leaveOpen: true);
311+
}
312+
else
313+
{
314+
return new StringReader(_csvString);
315+
}
316+
319317
}
318+
320319
}
321320

322321
/// <summary>
@@ -336,8 +335,8 @@ public static DataFrame LoadCsvFromString(string csvString,
336335
string[] columnNames = null, Type[] dataTypes = null,
337336
long numberOfRowsToRead = -1, int guessRows = 10, bool addIndexColumn = false)
338337
{
339-
string[] lines = csvString.Split(new[] { Environment.NewLine }, StringSplitOptions.None);
340-
return ReadCsvLinesIntoDataFrame(lines, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
338+
WrappedStreamReaderOrStringReader wrappedStreamReaderOrStringReader = new WrappedStreamReaderOrStringReader(csvString);
339+
return ReadCsvLinesIntoDataFrame(wrappedStreamReaderOrStringReader, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
341340
}
342341

343342
/// <summary>
@@ -369,12 +368,8 @@ public static DataFrame LoadCsv(Stream csvStream,
369368
throw new ArgumentException(string.Format(Strings.ExpectedEitherGuessRowsOrDataTypes, nameof(guessRows), nameof(dataTypes)));
370369
}
371370

372-
using (var streamReader = new StreamReader(csvStream, encoding ?? Encoding.UTF8, detectEncodingFromByteOrderMarks: true, DefaultStreamReaderBufferSize, leaveOpen: true))
373-
{
374-
CsvLineEnumerator linesEnumerator = new CsvLineEnumerator(streamReader);
375-
IEnumerable<string> lines = new CsvLines(linesEnumerator);
376-
return ReadCsvLinesIntoDataFrame(lines, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
377-
}
371+
WrappedStreamReaderOrStringReader wrappedStreamReaderOrStringReader = new WrappedStreamReaderOrStringReader(csvStream, encoding ?? Encoding.UTF8);
372+
return ReadCsvLinesIntoDataFrame(wrappedStreamReaderOrStringReader, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
378373
}
379374

380375
/// <summary>

0 commit comments

Comments
 (0)