Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Array/Map return types for UDF do not work correctly. #112

Closed
guruvonline opened this issue May 17, 2019 · 8 comments · Fixed by #114
Closed

Array/Map return types for UDF do not work correctly. #112

guruvonline opened this issue May 17, 2019 · 8 comments · Fixed by #114
Assignees
Labels
bug Something isn't working

Comments

@guruvonline
Copy link

guruvonline commented May 17, 2019

I have a scenario where for structured streaming input and for each event/row i have to write a custom logic/function which can return multiple rows.

looks like for return type UDF only supports basic type and not list/array.

Any workaround for this?

for sample my UDF is something like below, so that i can explode to create multiple rows.
`
Func<Column, Column> ToUpperList = Udf<string, string[]>((arg) =>
{
var ret = new string[] { arg, arg.ToUpper()};
return ret;
});

        var query = inStream
            .Select(Explode( ToUpperList(Col("InputEventColumn"))))
            .WriteStream()
            .OutputMode("append")
            .Format("console")
            .Start();`
@imback82
Copy link
Contributor

Looks like the array return type has a bug. I will fix it asap. I have a local fix working as follows:

var udf = Udf<string, string[]>((str) => new[] { str, str + str });
df.Select(Explode(udf(df["name"]))).Show();

The original table:

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

After exploding:

+--------------+
|           col|
+--------------+
|       Michael|
|MichaelMichael|
|          Andy|
|      AndyAndy|
|        Justin|
|  JustinJustin|
+--------------+

@imback82 imback82 self-assigned this May 17, 2019
@imback82 imback82 added the bug Something isn't working label May 17, 2019
@imback82 imback82 changed the title UDF to return multiple rows Array/Map return types for UDF do not work correctly. May 17, 2019
@guruvonline
Copy link
Author

few more related questions.

  • Can i have custom classes as return type

  • can return type be IEnumberable

@rapoth
Copy link
Contributor

rapoth commented May 18, 2019

  • Can i have custom classes as return type

No, at the moment. However, we'd like to understand the use case. Can you explain the scenario where you want this? (sample scenario with some snippets would be best)

  • can return type be IEnumberable

From what I understand, you want to iterate over the result set? If so, have you considered using ToLocalIterator which returns an IEnumerable.

@imback82
Copy link
Contributor

  • can return type be IEnumberable

From what I understand, you want to iterate over the result set? If so, have you considered using ToLocalIterator which returns an IEnumerable.

I think @guruvonline meant to have IEnumerable as a return type of UDF. Yes, this will be supported:

var udf = Udf<string, IEnumerable<string>>((str) => new[] { str, str + str });

@guruvonline
Copy link
Author

  • Can i have custom classes as return type

No, at the moment. However, we'd like to understand the use case. Can you explain the scenario where you want this? (sample scenario with some snippets would be best)

I have added a new feature request with example scenario

@danny8002
Copy link
Contributor

I also get this error, and the workaround don't work.


            SparkSession spark = SparkSession
                .Builder()
                .AppName("RunExe")
                .GetOrCreate();

            spark.Udf().Register<string, string[]>("udf1",  s=> new string[]{s, s+s});
            spark.Udf().Register<string[], string>("udf2", g => g[0]);

           DataFrame dt = xxxx;

           dt.Select(CallUDF("udf1", dt.Col("value")))                             //don't work
or         dt.Select(Explode(CallUDF("udf1", dt.Col("value"))))            //don't work

I always get the following error stack:

[JvmBridge] java.lang.IllegalArgumentException: Failed to convert the JSON string 'array<string>' to a data type.
        at org.apache.spark.sql.types.DataType$$anonfun$nameToType$1.apply(DataType.scala:129)
        at org.apache.spark.sql.types.DataType$$anonfun$nameToType$1.apply(DataType.scala:129)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at org.apache.spark.sql.types.DataType$.nameToType(DataType.scala:127)
        at org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:144)
        at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:113)
        at org.apache.spark.sql.types.DataType.fromJson(DataType.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.spark.api.dotnet.DotnetBackendHandler.handleMethodCall(DotnetBackendHandler.scala:162)
        at org.apache.spark.api.dotnet.DotnetBackendHandler.handleBackendRequest(DotnetBackendHandler.scala:102

just look at the source code [DateType.scala], (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala) , i am totally new guy about Scala, and i can't understand why "array" go wrong switch-case.


def fromJson(json: String): DataType = parseDataType(parse(json))  //  json = "array<string>"

  private[sql] def parseDataType(json: JValue): DataType = json match {
    // it fall into this case 
    case JString(name) =>
      nameToType(name)

   // supposed to be going here ?
    case JSortedObject(
    ("containsNull", JBool(n)),
    ("elementType", t: JValue),
    ("type", JString("array"))) =>
      ArrayType(parseDataType(t), n)

any workaround for me ? if possible, I can modify Microsoft.Spark locally to make it works.

@danny8002
Copy link
Contributor

finally, i understand the Scala code, and i fix it now locally in Microsoft.Spark. will send PR later.

@imback82
Copy link
Contributor

@danny8002 there is already a PR for this: #114.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants