Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to load the data from ELK using Databricks #2102

Closed
Sasik11941103 opened this issue Jun 19, 2023 · 8 comments
Closed

Unable to load the data from ELK using Databricks #2102

Sasik11941103 opened this issue Jun 19, 2023 · 8 comments

Comments

@Sasik11941103
Copy link

Sasik11941103 commented Jun 19, 2023

from pyspark.sql.functions import from_json,col

df = (spark.read
.format( "org.elasticsearch.spark.sql" )
.option( "es.nodes", "server")
.option( "es.port", "port")
.option( "es.net.http.auth.user", user)
.option( "es.net.http.auth.pass", password)
.option( "es.nodes.wan.only", "true")
.option( "es.net.ssl", "true")
.option( "es.read.field.include", "field name")
.load( "INDEX NAME" )

 ) 

display(df)

Index - is an remote index - :logs-

I'm getting this error
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot find mapping for :logs- - one is required before using Spark SQL

Py4JJavaError Traceback (most recent call last)
in <cell line: 3>()
1 from pyspark.sql.functions import from_json,col
2
----> 3 df = (spark.read
4 .format( "org.elasticsearch.spark.sql" )
5 .option( "es.nodes", "digitalelk.az.3pc.att.com")

/databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
175 self.options(**options)
176 if isinstance(path, str):
--> 177 return self._df(self._jreader.load(path))
178 elif path is not None:
179 if type(path) != list:

/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in call(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
194 def deco(*a: Any, **kw: Any) -> Any:
195 try:
--> 196 return f(*a, **kw)
197 except Py4JJavaError as e:
198 converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o589.load.
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot find mapping for :logs- - one is required before using Spark SQL
at org.elasticsearch.spark.sql.SchemaUtils$.discoverMappingAndGeoFields(SchemaUtils.scala:107)
at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:93)
at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:238)
at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:238)
at org.elasticsearch.spark.sql.ElasticsearchRelation.$anonfun$schema$1(DefaultSource.scala:242)
at scala.Option.getOrElse(Option.scala:189)
at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:242)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:498)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:375)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:331)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:331)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:237)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)

please look at this

Tasks

No tasks being tracked yet.
@jbaiera
Copy link
Member

jbaiera commented Jun 21, 2023

Hi there, thanks for your interest in ES-Hadoop!

The exception message here essentially provides the answer you're looking for:

Cannot find mapping for :logs- - one is required before using Spark SQL

You are trying to load a dataset into spark for an index that the client cannot confirm exists. Please create the index before trying to read it, or if it exists already, make sure that the user account used for authentication is allowed to access the data.

@jbaiera jbaiera closed this as completed Jun 21, 2023
@Sasik11941103
Copy link
Author

Hi Jbaiera, Thanks for response. The Index exists and all the privileges were given to particular user when we try to load the local index we are able to load the data but when we try to load the data from remote index which is :logs- we are unable to load data. Remote Index has two local indexes east and west regions then the logs present under local indexes

@jbaiera
Copy link
Member

jbaiera commented Jun 22, 2023

I'm a bit confused here, are you talking about using the index via cross cluster search?

@Sasik11941103
Copy link
Author

Yes

@Sasik11941103
Copy link
Author

I'm trying to load data from ELK cross cluster search with an index pattern but I'm unable to load data and getting the above error. Please help me with the changes I need to make while pulling the logs.

@jbaiera
Copy link
Member

jbaiera commented Jun 28, 2023

Unfortunately, after taking a look through ES-Hadoop, we do not currently support reading from remote indices via cross cluster search. This is because ES-Hadoop needs access to each index's mapping data in order to correctly deserialize it in Hadoop/Spark. Remote indices do not support the retrieval of their mappings, only basic search operations are provided.

One way we could handle this in ES-Hadoop would be to investigate using the field capabilities API to obtain the mapping data, but this would require changes to the library itself; It's not really a work around for the problem. Unfortunately, there's no workaround that I'm aware of other than connecting to the remote cluster via ES-Hadoop directly.

I'll file a new issue for CCS support and link it here, sorry for the inconvenience!

@Sasik11941103
Copy link
Author

Thanks for your response. Could you please create the ticket and attach it here.

@jbaiera
Copy link
Member

jbaiera commented Jun 29, 2023

I have opened #2107 for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants