Skip to content

Commit

Permalink
Dataframe csv datetime (#5834)
Browse files Browse the repository at this point in the history
* Give message with line and column of CSV file if data conversion fails

* Allow parsing of DateTime data in CSV import

* Delete query.json

* Address comments

* Verify contents on date column

* Change to a sample date

Co-authored-by: Derek Diamond <derek@primethought.biz>
  • Loading branch information
Prashanth Govindarajan and derekdiamond committed Jun 4, 2021
1 parent 8374401 commit 8801e40
Show file tree
Hide file tree
Showing 7 changed files with 596 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/Microsoft.Data.Analysis/DataFrame.IO.cs
Expand Up @@ -47,6 +47,13 @@ private static Type GuessKind(int col, List<string[]> read)
++nbline;
continue;
}
bool dateParse = DateTime.TryParse(val, out DateTime dateResult);
if (dateParse)
{
res = DetermineType(nbline == 0, typeof(DateTime), res);
++nbline;
continue;
}

res = DetermineType(nbline == 0, typeof(string), res);
++nbline;
Expand All @@ -71,6 +78,8 @@ private static Type MaxKind(Type a, Type b)
return typeof(float);
if (a == typeof(bool) || b == typeof(bool))
return typeof(bool);
if (a == typeof(DateTime) || b == typeof(DateTime))
return typeof(DateTime);
return typeof(string);
}

Expand Down Expand Up @@ -165,6 +174,10 @@ private static DataFrameColumn CreateColumn(Type kind, string[] columnNames, int
{
ret = new UInt16DataFrameColumn(GetColumnName(columnNames, columnIndex));
}
else if (kind == typeof(DateTime))
{
ret = new PrimitiveDataFrameColumn<DateTime>(GetColumnName(columnNames, columnIndex));
}
else
{
throw new NotSupportedException(nameof(kind));
Expand Down
1 change: 1 addition & 0 deletions src/Microsoft.Data.Analysis/DataFrame.cs
Expand Up @@ -531,6 +531,7 @@ public DataFrame Append(IEnumerable<object> row = null, bool inPlace = false)
if (value != null)
{
value = Convert.ChangeType(value, column.DataType);

if (value is null)
{
throw new ArgumentException(string.Format(Strings.MismatchedValueType, column.DataType), value.GetType().ToString());
Expand Down
314 changes: 314 additions & 0 deletions src/Microsoft.Data.Analysis/DateTimeComputation.cs
@@ -0,0 +1,314 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;

namespace Microsoft.Data.Analysis
{
internal class DateTimeComputation : IPrimitiveColumnComputation<DateTime>
{
public void Abs(PrimitiveColumnContainer<DateTime> column)
{
throw new NotSupportedException();
}

public void All(PrimitiveColumnContainer<DateTime> column, out bool ret)
{
throw new NotSupportedException();
}

public void Any(PrimitiveColumnContainer<DateTime> column, out bool ret)
{
throw new NotSupportedException();
}

public void CumulativeMax(PrimitiveColumnContainer<DateTime> column)
{
var ret = column.Buffers[0].ReadOnlySpan[0];
for (int b = 0; b < column.Buffers.Count; b++)
{
var buffer = column.Buffers[b];
var mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(buffer);
var mutableSpan = mutableBuffer.Span;
var readOnlySpan = buffer.ReadOnlySpan;
for (int i = 0; i < readOnlySpan.Length; i++)
{
var val = readOnlySpan[i];

if (val > ret)
{
ret = val;
}

mutableSpan[i] = ret;
}
column.Buffers[b] = mutableBuffer;
}
}

public void CumulativeMax(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows)
{
var ret = default(DateTime);
var mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(column.Buffers[0]);
var span = mutableBuffer.Span;
long minRange = 0;
long maxRange = ReadOnlyDataFrameBuffer<DateTime>.MaxCapacity;
long maxCapacity = maxRange;
IEnumerator<long> enumerator = rows.GetEnumerator();
if (enumerator.MoveNext())
{
long row = enumerator.Current;
if (row < minRange || row >= maxRange)
{
int bufferIndex = (int)(row / maxCapacity);
mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(column.Buffers[bufferIndex]);
span = mutableBuffer.Span;
minRange = checked(bufferIndex * maxCapacity);
maxRange = checked((bufferIndex + 1) * maxCapacity);
}
row -= minRange;
ret = span[(int)row];
}

while (enumerator.MoveNext())
{
long row = enumerator.Current;
if (row < minRange || row >= maxRange)
{
int bufferIndex = (int)(row / maxCapacity);
mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(column.Buffers[bufferIndex]);
span = mutableBuffer.Span;
minRange = checked(bufferIndex * maxCapacity);
maxRange = checked((bufferIndex + 1) * maxCapacity);
}
row -= minRange;

var val = span[(int)row];

if (val > ret)
{
ret = val;
}

span[(int)row] = ret;
}
}

public void CumulativeMin(PrimitiveColumnContainer<DateTime> column)
{
var ret = column.Buffers[0].ReadOnlySpan[0];
for (int b = 0; b < column.Buffers.Count; b++)
{
var buffer = column.Buffers[b];
var mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(buffer);
var mutableSpan = mutableBuffer.Span;
var readOnlySpan = buffer.ReadOnlySpan;
for (int i = 0; i < readOnlySpan.Length; i++)
{
var val = readOnlySpan[i];

if (val < ret)
{
ret = val;
}

mutableSpan[i] = ret;
}
column.Buffers[b] = mutableBuffer;
}
}

public void CumulativeMin(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows)
{
var ret = default(DateTime);
var mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(column.Buffers[0]);
var span = mutableBuffer.Span;
long minRange = 0;
long maxRange = ReadOnlyDataFrameBuffer<DateTime>.MaxCapacity;
long maxCapacity = maxRange;
IEnumerator<long> enumerator = rows.GetEnumerator();
if (enumerator.MoveNext())
{
long row = enumerator.Current;
if (row < minRange || row >= maxRange)
{
int bufferIndex = (int)(row / maxCapacity);
mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(column.Buffers[bufferIndex]);
span = mutableBuffer.Span;
minRange = checked(bufferIndex * maxCapacity);
maxRange = checked((bufferIndex + 1) * maxCapacity);
}
row -= minRange;
ret = span[(int)row];
}

while (enumerator.MoveNext())
{
long row = enumerator.Current;
if (row < minRange || row >= maxRange)
{
int bufferIndex = (int)(row / maxCapacity);
mutableBuffer = DataFrameBuffer<DateTime>.GetMutableBuffer(column.Buffers[bufferIndex]);
span = mutableBuffer.Span;
minRange = checked(bufferIndex * maxCapacity);
maxRange = checked((bufferIndex + 1) * maxCapacity);
}
row -= minRange;

var val = span[(int)row];

if (val < ret)
{
ret = val;
}

span[(int)row] = ret;
}
}

public void CumulativeProduct(PrimitiveColumnContainer<DateTime> column)
{
throw new NotSupportedException();
}

public void CumulativeProduct(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows)
{
throw new NotSupportedException();
}

public void CumulativeSum(PrimitiveColumnContainer<DateTime> column)
{
throw new NotSupportedException();
}

public void CumulativeSum(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows)
{
throw new NotSupportedException();
}

public void Max(PrimitiveColumnContainer<DateTime> column, out DateTime ret)
{
ret = column.Buffers[0].ReadOnlySpan[0];
for (int b = 0; b < column.Buffers.Count; b++)
{
var buffer = column.Buffers[b];
var readOnlySpan = buffer.ReadOnlySpan;
for (int i = 0; i < readOnlySpan.Length; i++)
{
var val = readOnlySpan[i];

if (val > ret)
{
ret = val;
}
}
}
}

public void Max(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows, out DateTime ret)
{
ret = default;
var readOnlySpan = column.Buffers[0].ReadOnlySpan;
long minRange = 0;
long maxRange = ReadOnlyDataFrameBuffer<DateTime>.MaxCapacity;
long maxCapacity = maxRange;
IEnumerator<long> enumerator = rows.GetEnumerator();
while (enumerator.MoveNext())
{
long row = enumerator.Current;
if (row < minRange || row >= maxRange)
{
int bufferIndex = (int)(row / maxCapacity);
readOnlySpan = column.Buffers[bufferIndex].ReadOnlySpan;
minRange = checked(bufferIndex * maxCapacity);
maxRange = checked((bufferIndex + 1) * maxCapacity);
}
row -= minRange;

var val = readOnlySpan[(int)row];

if (val > ret)
{
ret = val;
}
}
}

public void Min(PrimitiveColumnContainer<DateTime> column, out DateTime ret)
{
ret = column.Buffers[0].ReadOnlySpan[0];
for (int b = 0; b < column.Buffers.Count; b++)
{
var buffer = column.Buffers[b];
var readOnlySpan = buffer.ReadOnlySpan;
for (int i = 0; i < readOnlySpan.Length; i++)
{
var val = readOnlySpan[i];

if (val < ret)
{
ret = val;
}
}
}
}

public void Min(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows, out DateTime ret)
{
ret = default;
var readOnlySpan = column.Buffers[0].ReadOnlySpan;
long minRange = 0;
long maxRange = ReadOnlyDataFrameBuffer<DateTime>.MaxCapacity;
long maxCapacity = maxRange;
IEnumerator<long> enumerator = rows.GetEnumerator();
while (enumerator.MoveNext())
{
long row = enumerator.Current;
if (row < minRange || row >= maxRange)
{
int bufferIndex = (int)(row / maxCapacity);
readOnlySpan = column.Buffers[bufferIndex].ReadOnlySpan;
minRange = checked(bufferIndex * maxCapacity);
maxRange = checked((bufferIndex + 1) * maxCapacity);
}
row -= minRange;

var val = readOnlySpan[(int)row];

if (val < ret)
{
ret = val;
}
}
}

public void Product(PrimitiveColumnContainer<DateTime> column, out DateTime ret)
{
throw new NotSupportedException();
}

public void Product(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows, out DateTime ret)
{
throw new NotSupportedException();
}

public void Sum(PrimitiveColumnContainer<DateTime> column, out DateTime ret)
{
throw new NotSupportedException();
}

public void Sum(PrimitiveColumnContainer<DateTime> column, IEnumerable<long> rows, out DateTime ret)
{
throw new NotSupportedException();
}

public void Round(PrimitiveColumnContainer<DateTime> column)
{
throw new NotSupportedException();
}

}
}
Expand Up @@ -99,6 +99,11 @@ public static IPrimitiveColumnComputation<T> GetComputation<T>()
{
return (IPrimitiveColumnComputation<T>)new UShortComputation();
}
else if (typeof(T) == typeof(DateTime))
{
return (IPrimitiveColumnComputation<T>)new DateTimeComputation();
}

throw new NotSupportedException();
}
}
Expand Down
Expand Up @@ -55,6 +55,11 @@ namespace Microsoft.Data.Analysis
return (IPrimitiveColumnComputation<T>)new <#=type.ClassPrefix#>Computation();
}
<# } #>
else if (typeof(T) == typeof(DateTime))
{
return (IPrimitiveColumnComputation<T>)new DateTimeComputation();
}

throw new NotSupportedException();
}
}
Expand Down

0 comments on commit 8801e40

Please sign in to comment.