Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VectorUdf with DataFrame + Arrow #277

Merged
merged 39 commits into from
Mar 6, 2020
Merged

Conversation

pgovind
Copy link
Contributor

@pgovind pgovind commented Oct 1, 2019

Minor changes to the UDF API to pass in and return corefxlab DataFrames
Accompanying unit test changes

Putting it up here to get initial thoughts. FxDataFrame's Arrow support means true zero copy exchange of data.

@eerhardt @rapoth @imback82 . Not able to add reviewers for some reason :/

Can be reviewed now!

@@ -601,10 +594,10 @@ Int64Array ConvertInt64s(Int64Array int64s)
Assert.Equal($"udf: {i}", stringArray.GetString(i));
}

var longArray = (Int64Array)outputBatch.Column(1);
var doubleArray = (DoubleArray)outputBatch.Column(1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long -> double occurs in the line 523 above. dataFrame.Column(1) + 100 returns a double column.

Copy link
Member

@eerhardt eerhardt Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems wrong. PrimitiveColumn<long> + int should return a PrimitiveColumn<long>. Just like how:

long x = 0;
var y = x + 5;

y.GetType()

returns System.Int64.


In reply to: 330316160 [](ancestors = 330316160)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider this something we HAVE to change? For example, long + float results in a float because the compiler knows to make the conversions. The DataFrame doesn't attempt to convert because each column's Memory has to be cloned in the conversion. For a chain of operations, this seems wasteful? Instead it converts once to a PrimitiveColumn<double>(or PrimitiveColumn<decimal>) when there is a type mismatch so all subsequent operations can work without cloning.

The other alternative I'd considered was converting only when the underlying Memory had to be changed. PrimitiveColumn<long> + int would involve no conversions for example. I didn't think we should duplicate all the conversion logic in the compiler however, so I defaulted to what we have now. Do you know of an easier/better way to do this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it very surprising that adding, subtracting, or multiplying two integers would result in a floating-point number. IMO, I think that is something that should be changed.

But I'm not all customers, so maybe getting more thoughts/opinions here would be a good idea. But matching normal C# behavior seems like it is reasonable default behavior.

@eerhardt
Copy link
Member

eerhardt commented Oct 2, 2019

Looks like a test is failing:

[xUnit.net 00:00:35.73]     Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf [FAIL]
  X Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf [1s 347ms]
  Error Message:
   Assert.Equal() Failure
Expected: 3
Actual:   5

@@ -8,6 +8,9 @@
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using StructType = Microsoft.Spark.Sql.Types.StructType;
using FxDataFrame = Microsoft.Data.DataFrame;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 @rapoth - do either of you have any thoughts/opinions on how to make this better? Using 2 types named DataFrame in Spark seems unfortunate. Is there a better term we can call the corefx DataFrame class?

@pgovind pgovind changed the title VectorUdf with DataFrame + Arrow WIP: VectorUdf with DataFrame + Arrow Oct 8, 2019
@pgovind pgovind changed the title WIP: VectorUdf with DataFrame + Arrow VectorUdf with DataFrame + Arrow Oct 9, 2019
eng/Versions.props Outdated Show resolved Hide resolved
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using StructType = Microsoft.Spark.Sql.Types.StructType;
using FxDataFrame = Microsoft.Data.Analysis.DataFrame;
Copy link
Contributor Author

@pgovind pgovind Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eerhardt question from before:

@imback82 @rapoth - do either of you have any thoughts/opinions on how to make this better? Using 2 types named DataFrame in Spark seems unfortunate. Is there a better term we can call the corefx DataFrame class?

Copy link
Contributor Author

@pgovind pgovind Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think about this sooner rather than later. For ex: In the last .NET Town Hall, Spark Notebooks was demo'd and Dan had a question of whether the DataFrame he saw in the sample was a Spark DataFrame or the corefx DataFrame?

@pgovind
Copy link
Contributor Author

pgovind commented Nov 11, 2019

Updated to address comments and use the Microsoft.Data.Analysis package on Nuget

}

return builder.Build();
return (PrimitiveDataFrameColumn<double>)(price * (1 - discount) * (1 + tax));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not have operator overloads for PrimitiveDataFrameColumn<T> that return the same type? I think that would make this code simpler so the user doesn't have to cast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No unfortunately. I'll file a bug.

Func<Column, Column> udf2 = ExperimentalDataFrameFunctions.VectorUdf<ArrowStringDataFrameColumn, ArrowStringDataFrameColumn>(
(strings) =>
{
StringArray stringArray = (StringArray)ToArrowArray(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we need to first change to arrow array then back to dataframe column? No way to go from ArrowStringDataFrameColumn to ArrowStringDataFrameColumn? cc: @eerhardt

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had an Apply function that could be used here.
One of the biggest issues is going from the UTF8 arrow string format, to the UTF16 .NET string format, then back to UTF8 arrow string format.


In reply to: 387794908 [](ancestors = 387794908)

Copy link
Contributor

@imback82 imback82 Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it will be really nice if the user doesn't have to go thru Arrow APIs but completely lives in FxDataFrame APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have it in 0.2.0 yet. However, we should release a 0.3.0 soon which will have this API. How about we accept this change for now and I put up a new PR once 0.3.0 is out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good plan to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me as well.

@@ -100,6 +108,16 @@ public static IArrowType GetArrowType<T>()
throw new NotSupportedException($"Unknown type: {typeof(T)}");
}

public static ArrowStringDataFrameColumn ToArrowStringDataFrameColumn(StringArray array)
{
return new ArrowStringDataFrameColumn("String",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the name to null? It's a bit confusing to give a name from unnamed column (StringArray).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can't. Arrow doesn't allow empty or null values for the column name. We encounter this when we go back from DataFrame->Arrow Record Batches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So when we get ArrowStringDataFrameColumn in the UDF, is it already named? If so, the name is from the Arrow sent from JVM, right?

@pgovind
Copy link
Contributor Author

pgovind commented Mar 4, 2020

Addressed comments and did a search/replace for styling changes(I think I got all of them).

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some minor comments, but otherwise, LGTM. Thanks @pgovind!

src/csharp/Microsoft.Spark.UnitTest/UdfWrapperTests.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark.UnitTest/WorkerFunctionTests.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark.UnitTest/WorkerFunctionTests.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark.UnitTest/WorkerFunctionTests.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark.UnitTest/WorkerFunctionTests.cs Outdated Show resolved Hide resolved
DataFrameColumn ret;
if (typeof(T) == typeof(PrimitiveDataFrameColumn<bool>))
{
ret = new PrimitiveDataFrameColumn<bool>("Empty");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, it doesn't change the behavior. I prefer returning here because I don't have to scroll down all the way down to see if we are doing more to ret or not (and we can get rid of ret as well).

src/csharp/Microsoft.Spark/Sql/ArrowUdfWrapper.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs Outdated Show resolved Hide resolved
@imback82
Copy link
Contributor

imback82 commented Mar 5, 2020

Addressed comments and did a search/replace for styling changes(I think I got all of them).

I was probably reviewing while you were making changes. If you already fixed them, just resolve my comments. Thanks!

@pgovind
Copy link
Contributor Author

pgovind commented Mar 6, 2020

I see. So when we get ArrowStringDataFrameColumn in the UDF, is it already named? If so, the name is from the Arrow sent from JVM, right?

Yup, it is already named with the Arrow data from the JVM. The Arrow schema defines a Field for a StringColumn that provides a Name.

Prashanth Govindarajan added 2 commits March 5, 2020 22:24
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit comment

src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs Outdated Show resolved Hide resolved
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @pgovind!

@pgovind
Copy link
Contributor Author

pgovind commented Mar 6, 2020

Thanks for the review @imback82. I know it was a time consuming PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants