Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting an error running "Create a Spark RDD on top of Elasticsearch (logs-endpoint-winevent-sysmon-* as source)" #39

Closed
jgarrettvml opened this issue Mar 28, 2018 · 5 comments
Labels

Comments

@jgarrettvml
Copy link

I've been all over everywhere else, trying to figure this out but so much is associated with standard ELK - so wondering if I need to do something specific with HELK to fix this, please help...thanks in advance

es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "logs-endpoint-winevent-sysmon-*/doc", "es.nodes" : "10.0.1.190" }) es_rdd.first()

gives the following error:

`---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in ()
5 conf={
6 "es.resource" : "logs-endpoint-winevent-sysmon-*/doc",
----> 7 "es.nodes" : "10.0.1.190"
8 })
9 es_rdd.first()

/opt/helk/spark/spark-2.3.0-bin-hadoop2.7/python/pyspark/context.py in newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
703 jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
704 valueClass, keyConverter, valueConverter,
--> 705 jconf, batchSize)
706 return RDD(jrdd, self)
707

/opt/helk/spark/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in call(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:

/opt/helk/spark/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/opt/helk/spark/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 raise Py4JJavaError(
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
322 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.0.1.190:9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:380)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:388)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:484)
at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:479)
at org.elasticsearch.hadoop.rest.InitializationUtils.checkIndexStatus(InitializationUtils.java:73)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettingsForReading(InitializationUtils.java:271)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:218)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:405)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:386)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:127)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1337)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.take(RDD.scala:1331)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:282)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)`

@jgarrettvml jgarrettvml changed the title Getting an issue running "Create a Spark RDD on top of Elasticsearch (logs-endpoint-winevent-sysmon-* as source)" Getting an error running "Create a Spark RDD on top of Elasticsearch (logs-endpoint-winevent-sysmon-* as source)" Mar 28, 2018
@Cyb3rWard0g
Copy link
Owner

Cyb3rWard0g commented Mar 28, 2018

Hey @jgarrettvml ! The error message I see is the following:

: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.0.1.190:9200]]

I see it is trying to talk to 10.0.1.190. However, the HELK has an Internal network where all the services (ELK, KAFKA and Analytics) communicate with each other.

networks:
  helk:
    driver: bridge
    ipam:
      config:
        - subnet: 172.18.0.0/16

and the IP of the ELK stack is 172.18.0.2. That IP is static and is not related to your network. This is hardcoded in your ELK as the docker internal IP. This is why in the Jupyter Notebooks available in the build, I always use 172.18.0.2 when reading from elasticsearch . In the notebook that I assume you were following based on the Issue title, you need to create a RDD by running the following without changing anything. The following should work as is:

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={ 
        "es.resource" : "logs-endpoint-winevent-sysmon-*/doc",
        "es.nodes" : "172.18.0.2"
    })
es_rdd.first()

Yours is pointing to 10.0.1.190 which I assume it is the IP you assigned to your HELK to be available in your network. The 2 notebooks available in the build should work without making any changes to them. Let me know if this helps.

@jgarrettvml
Copy link
Author

Wow, thank you for the reply - this fixed it the error. Yeah, I was using the local static - I should have know there was a bridged network, assuming that's what the docker proxy is doing in part.

I'm now getting a "Index [logs-endpoint-winevent-sysmon-*/doc] missing and settings [es.index.read.missing.as.empty] is set to false"

Which I guess is in relation to kibana stating no indices match pattern for "logs-endpoint-winevent-sysmon-*"

I plan to be using several log shipping methods, including filebeat, packetbeat, etc I am assuming this can be configured internally using 172 and externally using my mapped domain:8082 ??

Thanks again, that was a great help...

@Cyb3rWard0g
Copy link
Owner

Nicee!!! Yeah if you dont have anything sent to the HELK-Kafka container, then you wont have any data in your sysmon index. At the moment you can send data with Winlogbeat from Windows Computer to the winlogbeat topic. If you are going to send data from filebeat or packetbeat, you will have to create an extra topic and index in the logstash configs. I am slowly trying to cover more data sources and shippers. Let me know how I can help you with your use case. I can help to create the topic and make sure the data makes it all the way to your Elasticsearch. If you are doing internal collection, you still have to point to the internal Kafka instance. If you are sending data to the HELK from your network, you have to still point it to the Kafka instances. Those use your HELK IP and the ports :9092. :9093, :9094. One example is in the winlogbeat config file available in the repo. Check the Kafka section. https://github.com/Cyb3rWard0g/HELK/blob/master/winlogbeat/winlogbeat.yml

@Cyb3rWard0g
Copy link
Owner

The HELK:8082 is for Cerebro to use and access your ES server. Thats different. Besides, initial ingestion of data does not happen directly at the elasticsearch level. It happens at the Kafka level. Kafka brokers receive data from producers (winlogbeat), Logstash transforms the data and then it is pushed to elasticsearch.

@Cyb3rWard0g
Copy link
Owner

Closing this issue since there is nothing to fix in relation to the title. If you have any other questions feel free to reach out by opening another issue. Happy to help! 👍 Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants