Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] spark doris connector read table error: Doris FE's response cannot map to schema. #39

Closed
2 of 3 tasks
myfjdthink opened this issue Jun 23, 2022 · 1 comment
Closed
2 of 3 tasks

Comments

@myfjdthink
Copy link
Contributor

myfjdthink commented Jun 23, 2022

Search before asking

  • I had searched in the issues and found no similar issues.

Version

  • connector : org.apache.doris:spark-doris-connector-3.1_2.12:1.0.1
  • doris: 1.1 preview2
  • spark: 3.1.2

What's Wrong?

Read a table

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('Spark Doris Demo Nick') \
 .config('org.apache.doris:spark-doris-connector-3.1_2.12:1.0.1') \
 .getOrCreate()
spark

dorisSparkDF = spark.read.format("doris")\
    .option("doris.table.identifier", "db.token_info")\
    .option("doris.fenodes", "xxx:8031")\
    .option("user", "xxx")\
    .option("password", "xxx").load()
dorisSparkDF.show(5)

then get a error

22/06/23 07:47:03 ERROR SchemaUtils: Doris FE's response cannot map to schema. res: {"keysType":"UNIQUE_KEYS","properties":[{"name":"chain","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_slug","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_address","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_symbol","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"decimals","aggregation_type":"REPLACE","comment":"","type":"INT"},{"name":"type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"protocol_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"manual_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"erc20_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"coin_gecko_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"logo","aggregation_type":"REPLACE","comment":"","type":"STRING"}],"status":200}
org.codehaus.jackson.map.exc.UnrecognizedPropertyException: Unrecognized field "keysType" (Class org.apache.doris.spark.rest.models.Schema), not marked as ignorable
 at [Source: java.io.StringReader@74af102e; line: 1, column: 14] (through reference chain: org.apache.doris.spark.rest.models.Schema["keysType"])
	at org.codehaus.jackson.map.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:53)
	at org.codehaus.jackson.map.deser.StdDeserializationContext.unknownFieldException(StdDeserializationContext.java:267)
	at org.codehaus.jackson.map.deser.std.StdDeserializer.reportUnknownProperty(StdDeserializer.java:673)
	at org.codehaus.jackson.map.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:659)
	at org.codehaus.jackson.map.deser.BeanDeserializer.handleUnknownProperty(BeanDeserializer.java:1365)
	at org.codehaus.jackson.map.deser.BeanDeserializer._handleUnknown(BeanDeserializer.java:725)
	at org.codehaus.jackson.map.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:703)
	at org.codehaus.jackson.map.deser.BeanDeserializer.deserialize(BeanDeserializer.java:580)
	at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2732)
	at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1863)
	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:295)
	at org.apache.doris.spark.rest.RestService.getSchema(RestService.java:279)
	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchemaFromFe(SchemaUtils.scala:51)
	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchema(SchemaUtils.scala:41)
	at org.apache.doris.spark.sql.DorisRelation.lazySchema$lzycompute(DorisRelation.scala:48)
	at org.apache.doris.spark.sql.DorisRelation.lazySchema(DorisRelation.scala:48)
	at org.apache.doris.spark.sql.DorisRelation.schema(DorisRelation.scala:52)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:449)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 dorisSparkDF = spark.read.format("doris")\
      2     .option("doris.table.identifier", "xxx.token_info")\
      3     .option("doris.fenodes", "xxxx:8031")\
      4     .option("user", "xxxx")\
      5     .option("password", "xxxxx").load()
      6 dorisSparkDF.show(5)

File /usr/lib/spark/python/pyspark/sql/readwriter.py:210, in DataFrameReader.load(self, path, format, schema, **options)
    208     return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    209 else:
--> 210     return self._df(self._jreader.load())

File /opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File /usr/lib/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File /opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o72.load.
: org.apache.doris.spark.exception.DorisException: Doris FE's response cannot map to schema. res: {"keysType":"UNIQUE_KEYS","properties":[{"name":"chain","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_slug","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_address","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_symbol","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"decimals","aggregation_type":"REPLACE","comment":"","type":"INT"},{"name":"type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"protocol_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"manual_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"erc20_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"coin_gecko_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"logo","aggregation_type":"REPLACE","comment":"","type":"STRING"}],"status":200}
	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:303)
	at org.apache.doris.spark.rest.RestService.getSchema(RestService.java:279)
	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchemaFromFe(SchemaUtils.scala:51)
	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchema(SchemaUtils.scala:41)
	at org.apache.doris.spark.sql.DorisRelation.lazySchema$lzycompute(DorisRelation.scala:48)
	at org.apache.doris.spark.sql.DorisRelation.lazySchema(DorisRelation.scala:48)
	at org.apache.doris.spark.sql.DorisRelation.schema(DorisRelation.scala:52)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:449)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.codehaus.jackson.map.exc.UnrecognizedPropertyException: Unrecognized field "keysType" (Class org.apache.doris.spark.rest.models.Schema), not marked as ignorable
 at [Source: java.io.StringReader@74af102e; line: 1, column: 14] (through reference chain: org.apache.doris.spark.rest.models.Schema["keysType"])
	at org.codehaus.jackson.map.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:53)
	at org.codehaus.jackson.map.deser.StdDeserializationContext.unknownFieldException(StdDeserializationContext.java:267)
	at org.codehaus.jackson.map.deser.std.StdDeserializer.reportUnknownProperty(StdDeserializer.java:673)
	at org.codehaus.jackson.map.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:659)
	at org.codehaus.jackson.map.deser.BeanDeserializer.handleUnknownProperty(BeanDeserializer.java:1365)
	at org.codehaus.jackson.map.deser.BeanDeserializer._handleUnknown(BeanDeserializer.java:725)
	at org.codehaus.jackson.map.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:703)
	at org.codehaus.jackson.map.deser.BeanDeserializer.deserialize(BeanDeserializer.java:580)
	at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2732)
	at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1863)
	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:295)
	... 23 more

What You Expected?

There should be no errors

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@DarvenDuan
Copy link

version 1.1.0 had fixed this bug

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants