From f045a714c0b0cc04095cdb593c4e6621cf8b2ab0 Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Thu, 7 Apr 2016 23:06:49 +0200 Subject: [PATCH 1/6] New versions for the stack elements --- libbeat/docs/index.asciidoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/docs/index.asciidoc b/libbeat/docs/index.asciidoc index 5f2f1be4a87..bfa0b92d29e 100644 --- a/libbeat/docs/index.asciidoc +++ b/libbeat/docs/index.asciidoc @@ -4,10 +4,10 @@ :topbeat: http://www.elastic.co/guide/en/beats/topbeat/1.2 :filebeat: http://www.elastic.co/guide/en/beats/filebeat/1.2 :winlogbeat: http://www.elastic.co/guide/en/beats/winlogbeat/1.2 -:ES-version: 2.3.0 -:LS-version: 2.3.0 +:ES-version: 2.3.1 +:LS-version: 2.3.1 :Kibana-version: 4.5.0 -:Dashboards-version: 1.2.0 +:Dashboards-version: 1.2.1 include::./overview.asciidoc[] From c8e4de60ddbe55d58fcbd26fc6934021e9b901d7 Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Thu, 7 Apr 2016 15:08:04 -0700 Subject: [PATCH 2/6] Remove link to github repo for dashboards --- packetbeat/docs/gettingstarted.asciidoc | 15 ++++++--------- topbeat/docs/gettingstarted.asciidoc | 14 ++++++-------- winlogbeat/docs/getting-started.asciidoc | 14 ++++++-------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/packetbeat/docs/gettingstarted.asciidoc b/packetbeat/docs/gettingstarted.asciidoc index 8d62d8f99dc..6fddc6a3342 100644 --- a/packetbeat/docs/gettingstarted.asciidoc +++ b/packetbeat/docs/gettingstarted.asciidoc @@ -250,16 +250,13 @@ instance. The command should return data about the HTTP transaction you just cre === Step 5: Loading Sample Kibana Dashboards To make it easier for you to get application performance insights -from packet data, we have created a few sample dashboards. The -dashboards are maintained in this -https://github.com/elastic/beats-dashboards[GitHub repository], which also -includes instructions for loading the dashboards. - -For more information about loading and viewing the dashboards, see {libbeat}/visualizing-data.html[Visualizing Your Data in Kibana]. +from packet data, we have created a sample Packetbeat dashboard. This dashboard is provided as +an example. We recommend that you +http://www.elastic.co/guide/en/kibana/current/dashboard.html[customize] it +to meet your needs. +For more information about loading and viewing the Beats dashboards, see +{libbeat}/visualizing-data.html[Visualizing Your Data in Kibana]. image:./images/packetbeat-statistics.png[Packetbeat statistics] -These dashboards are provided as examples. We recommend that you -http://www.elastic.co/guide/en/kibana/current/dashboard.html[customize] them -to meet your needs. diff --git a/topbeat/docs/gettingstarted.asciidoc b/topbeat/docs/gettingstarted.asciidoc index 18c4c000662..ad20a7567e3 100644 --- a/topbeat/docs/gettingstarted.asciidoc +++ b/topbeat/docs/gettingstarted.asciidoc @@ -191,18 +191,16 @@ On Windows, if you don't have cURL installed, simply point your browser to the U === Step 5: Loading Sample Kibana Dashboards To make it easier for you to start monitoring your servers in Kibana, -we have created a few sample dashboards. The dashboards are maintained in this -https://github.com/elastic/beats-dashboards[GitHub repository], which also -includes instructions for loading the dashboards. +we have created a sample Topbeat dashboard. This dashboard is provided as +an example. We recommend that you +http://www.elastic.co/guide/en/kibana/current/dashboard.html[customize] it +to meet your needs. -For more information about loading and viewing the dashboards, see {libbeat}/visualizing-data.html[Visualizing Your Data in Kibana]. +For more information about loading and viewing the Beats dashboards, see +{libbeat}/visualizing-data.html[Visualizing Your Data in Kibana]. image:./images/topbeat-dashboard.png[Topbeat statistics] -These dashboards are provided as examples. We recommend that you -http://www.elastic.co/guide/en/kibana/current/dashboard.html[customize] them -to meet your needs. - ==== Example of a System-Wide Overview You can configure the `Dashboard` page to show the statistics for all servers or for a diff --git a/winlogbeat/docs/getting-started.asciidoc b/winlogbeat/docs/getting-started.asciidoc index b8ccd4f72c4..7fe64da11b3 100644 --- a/winlogbeat/docs/getting-started.asciidoc +++ b/winlogbeat/docs/getting-started.asciidoc @@ -155,14 +155,12 @@ PS C:\Program Files\Winlogbeat> Stop-Service winlogbeat === Step 6: Loading Sample Kibana Dashboards To make it easier for you to start monitoring your servers in Kibana, -we have created a few sample dashboards. The dashboards are maintained in this -https://github.com/elastic/beats-dashboards[GitHub repository], which also -includes instructions for loading the dashboards. +we have created a sample Winlogbeat dashboard. This dashboard is provided as +an example. We recommend that you +http://www.elastic.co/guide/en/kibana/current/dashboard.html[customize] it +to meet your needs. -For more information about loading and viewing the dashboards, see {libbeat}/visualizing-data.html[Visualizing Your Data in Kibana]. +For more information about loading and viewing the Beats dashboards, see +{libbeat}/visualizing-data.html[Visualizing Your Data in Kibana]. image:./images/winlogbeat-dashboard.png[Winlogbeat statistics] - -These dashboards are provided as examples. We recommend that you -http://www.elastic.co/guide/en/kibana/current/dashboard.html[customize] them -to meet your needs. From 508dbf25e0de683e9565316e62ac0379f892d138 Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Fri, 8 Apr 2016 12:27:44 -0700 Subject: [PATCH 3/6] Change config example that shows how to turn off process monitoring --- topbeat/docs/gettingstarted.asciidoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/topbeat/docs/gettingstarted.asciidoc b/topbeat/docs/gettingstarted.asciidoc index ad20a7567e3..2ca46e5d44d 100644 --- a/topbeat/docs/gettingstarted.asciidoc +++ b/topbeat/docs/gettingstarted.asciidoc @@ -107,13 +107,13 @@ By default, it's set to 10 seconds. * The `procs` option defines a list of regular expressions to match all the processes that need to be monitored. By default, all the running processes are monitored. + -If you are not interested in monitoring processes, you can use: +If you are not interested in collecting per-process statistics, you can use: + [source, shell] ------------------------------------- input: - period: 10 - procs: ["^$"] + stats: + process: false ------------------------------------- . If you are sending output to Elasticsearch, set the IP address and port where Topbeat can find the Elasticsearch installation: From b8c6167f7bd0654173a3b04a61d6e35c4b4033fa Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Thu, 14 Apr 2016 10:32:29 -0700 Subject: [PATCH 4/6] Clarify meaning of spool_size and harvester_buffer_size (#1388) --- .../docs/reference/configuration/filebeat-options.asciidoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 2aed75c7d37..d0ca15588bd 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -159,7 +159,7 @@ document. The default value is `log`. ===== harvester_buffer_size -The buffer size every harvester uses when fetching the file. The default is 16384. +The size in bytes of the buffer that each harvester uses when fetching a file. The default is 16384. ===== max_bytes @@ -291,8 +291,8 @@ rotated files in case not all lines were read from the rotated file. ===== spool_size -The event count spool threshold. This setting forces a network flush if the specified -value is exceeded. +The event count spool threshold. This setting forces a network flush if the number of events in the spooler exceeds +the specified value. [source,yaml] ------------------------------------------------------------------------------------- From 4bca562359d6e4f6529cb3e857a44f0616fe056a Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Mon, 18 Apr 2016 17:55:34 -0700 Subject: [PATCH 5/6] Add more info to doc about configuring TLS (#1414) --- filebeat/docs/index.asciidoc | 6 +- filebeat/docs/securing-filebeat.asciidoc | 7 + libbeat/docs/https.asciidoc | 13 +- libbeat/docs/shared-faq.asciidoc | 38 +++++ .../docs/shared-tls-logstash-config.asciidoc | 142 ++++++++++++++++++ packetbeat/docs/index.asciidoc | 8 +- packetbeat/docs/securing-packetbeat.asciidoc | 7 + topbeat/docs/index.asciidoc | 4 + topbeat/docs/securing-topbeat.asciidoc | 7 + winlogbeat/docs/index.asciidoc | 4 + winlogbeat/docs/securing-winlogbeat.asciidoc | 7 + 11 files changed, 237 insertions(+), 6 deletions(-) create mode 100644 filebeat/docs/securing-filebeat.asciidoc create mode 100644 libbeat/docs/shared-tls-logstash-config.asciidoc create mode 100644 packetbeat/docs/securing-packetbeat.asciidoc create mode 100644 topbeat/docs/securing-topbeat.asciidoc create mode 100644 winlogbeat/docs/securing-winlogbeat.asciidoc diff --git a/filebeat/docs/index.asciidoc b/filebeat/docs/index.asciidoc index d8ace884d88..e8a9ca6f8be 100644 --- a/filebeat/docs/index.asciidoc +++ b/filebeat/docs/index.asciidoc @@ -14,9 +14,13 @@ include::./configuring-howto.asciidoc[] include::../../libbeat/docs/shared-env-vars.asciidoc[] +include::./multiple-prospectors.asciidoc[] + +include::./securing-filebeat.asciidoc[] + include::../../libbeat/docs/https.asciidoc[] -include::./multiple-prospectors.asciidoc[] +include::../../libbeat/docs/shared-tls-logstash-config.asciidoc[] include::./troubleshooting.asciidoc[] diff --git a/filebeat/docs/securing-filebeat.asciidoc b/filebeat/docs/securing-filebeat.asciidoc new file mode 100644 index 00000000000..2eaba2fda18 --- /dev/null +++ b/filebeat/docs/securing-filebeat.asciidoc @@ -0,0 +1,7 @@ +[[securing-filebeat]] +== Securing Filebeat + +The following topics describe how to secure communication between Filebeat and other products in the Elastic stack: + +* <> +* <> \ No newline at end of file diff --git a/libbeat/docs/https.asciidoc b/libbeat/docs/https.asciidoc index 038cf4a0e85..6b7c603386a 100644 --- a/libbeat/docs/https.asciidoc +++ b/libbeat/docs/https.asciidoc @@ -10,7 +10,8 @@ //// This content is structured to be included as a whole file. ////////////////////////////////////////////////////////////////////////// -=== Securing Communication +[[securing-communication-elasticsearch]] +=== Securing Communication With Elasticsearch To secure the communication between {beatname_uc} and Elasticsearch, you can use HTTPS and basic authentication. Here is a sample configuration: @@ -39,8 +40,10 @@ https://www.elastic.co/guide/en/shield/current/certificate-authority.html[Settin appendix from the Shield guide. By default {beatname_uc} uses the list of trusted certificate authorities from the -operating system where {beatname_uc} is running. You can configure a Beat to use a specific list of -CA certificates instead of the list from the OS. Here is an example: +operating system where {beatname_uc} is running. You can configure {beatname_uc} to use a specific list of +CA certificates instead of the list from the OS. You can also configure it to use client authentication +by specifying the certificate and key to use when the server requires the Beat to authenticate. Here is an example +configuration: ["source","yaml",subs="attributes,callouts"] ---------------------------------------------------------------------- @@ -53,8 +56,12 @@ elasticsearch: certificate_authorities: <1> - /etc/pki/my_root_ca.pem - /etc/pki/my_other_ca.pem + certificate: "/etc/pki/client.pem" <2> + certificate_key: "/etc/pki/key.pem" <3> ---------------------------------------------------------------------- <1> The list of CA certificates to trust +<2> The path to the certificate for TLS client authentication +<3> The client certificate key NOTE: For any given connection, the SSL/TLS certificates must have a subject that matches the value specified for `hosts`, or the TLS handshake fails. diff --git a/libbeat/docs/shared-faq.asciidoc b/libbeat/docs/shared-faq.asciidoc index 6174ff6a9c4..1515a14f109 100644 --- a/libbeat/docs/shared-faq.asciidoc +++ b/libbeat/docs/shared-faq.asciidoc @@ -68,3 +68,41 @@ telnet 5044 TIP: For testing purposes only, you can set `insecure: true` to disable hostname checking. * Make sure that you have enabled SSL (set `ssl => true`) when configuring the https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html[Beats input plugin for Logstash]. + +==== Common Errors and Resolutions + +Here are some common errors and ways to fix them: + +* <> +* <> +* <> +* <> + +[[cannot-validate-certificate]] +===== x509: cannot validate certificate for because it doesn't contain any IP SANs + +This happens because your certificate is only valid for the hostname present in the Subject field. + +To resolve this problem, try one of these solutions: + +* Create a DNS entry for the hostname mapping it to the server's IP. +* Create an entry in `/etc/hosts` for the hostname. Or on Windows add an entry to +`C:\Windows\System32\drivers\etc\hosts`. +* Re-create the server certificate and add a SubjectAltName (SAN) for the IP address of the server. This make the +server's certificate valid for both the hostname and the IP address. + +[[getsockopt-no-route-to-host]] +===== getsockopt: no route to host + +This is not a TLS problem. It's a networking problem. Make sure the two hosts can communicate. + +[[getsockopt-connection-refused]] +===== getsockopt: connection refused + +This is not a TLS problem. Make sure that Logstash is running and that there is no firewall blocking the traffic. + +[[target-machine-refused-connection]] +===== No connection could be made because the target machine actively refused it + +A firewall is refusing the connection. Check if a firewall is blocking the traffic on the client, the network, or the +destination host. diff --git a/libbeat/docs/shared-tls-logstash-config.asciidoc b/libbeat/docs/shared-tls-logstash-config.asciidoc new file mode 100644 index 00000000000..459eb247b5e --- /dev/null +++ b/libbeat/docs/shared-tls-logstash-config.asciidoc @@ -0,0 +1,142 @@ +////////////////////////////////////////////////////////////////////////// +//// This content is shared by all Elastic Beats. Make sure you keep the +//// descriptions here generic enough to work for all Beats that include +//// this file. When using cross references, make sure that the cross +//// references resolve correctly for any files that include this one. +//// Use the appropriate variables defined in the index.asciidoc file to +//// resolve Beat names: beatname_uc and beatname_lc. +//// Use the following include to pull this content into a doc file: +//// include::../../libbeat/docs/shared-tls-logstash-config.asciidoc[] +////////////////////////////////////////////////////////////////////////// + +[[configuring-tls-logstash]] +=== Securing Communication With Logstash by Using TLS + +You can use TLS mutual authentication to secure connections between {beatname_uc} and Logstash. This ensures that +{beatname_uc} sends encrypted data to trusted Logstash servers only, and that the Logstash server receives data from +trusted {beatname_uc} clients only. + +To use TLS mutual authentication: + +. Create a certificate authority (CA) and use it to sign the certificates that you plan to use for +{beatname_uc} and Logstash. Creating a correct SSL/TLS infrastructure is outside the scope of this +document. There are many online resources available that describe how to create certificates. ++ +NOTE: Certificates must be signed by your root CA. Intermediate CAs are currently not supported. + +. Configure {beatname_uc} to use TLS. In the +{beatname_lc}.yml+ config file, specify the following settings under +`tls`: ++ +* `certificate_authorities`: Configures {beatname_uc} to trust any certificates signed by the specified CA. If +`certificate_authorities` is empty or not set, the trusted certificate authorities of the host system are used. +* `certificate` and `certificate_key`: Specifies the certificate and key that {beatname_uc} uses to authenticate with +Logstash. ++ +For example: ++ +[source,yaml] +------------------------------------------------------------------------------ +output: + logstash: + hosts: ["logs.mycompany.com:5044"] + tls: + certificate_authorities: ["/etc/ca.crt"] + certificate: "/etc/client.crt" + certificate_key: "/etc/client.key" +------------------------------------------------------------------------------ ++ +For more information about these configuration options, see <>. + +. Configure Logstash to use TLS. In the Logstash config file, specify the following settings for the https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html[Beats input plugin for Logstash]: ++ +* `ssl`: When set to true, enables Logstash to use SSL/TLS. +* `ssl_certificate_authorities`: Configures Logstash to trust any certificates signed by the specified CA. +* `ssl_certificate` and `ssl_key`: Specify the certificate and key that Logstash uses to authenticate with the client. +* `ssl_verify_mode`: Specifies whether the Logstash server verifies the client certificate against the CA. You +need to specify either `peer` or `force_peer` to make the server ask for the certificate and validate it. If you +specify `force_peer`, and {beatname_uc} doesn't provide a certificate, the Logstash connection will be closed. ++ +For example: ++ +[source,json] +------------------------------------------------------------------------------ +input { + beats { + port => 5044 + ssl => true + ssl_certificate_authorities => ["/etc/ca.crt"] + ssl_certificate => "/etc/server.crt" + ssl_key => "/etc/server.key" + ssl_verify_mode => "force_peer" + } +} +------------------------------------------------------------------------------ ++ +For more information about these options, see the +https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html[documentation] for the Beats input plugin. + +[[testing-tls-logstash]] +==== Validating the Logstash Server's Certificate + +Before running {beatname_uc}, you should validate the Logstash server's certificate. You can use `curl` to validate the certificate even though the protocol used to communicate with Logstash is not based on HTTP. For example: + +[source,shell] +------------------------------------------------------------------------------ +curl -v --cacert ca.crt https://logs.mycompany.com:5044 +------------------------------------------------------------------------------ + +If the test is successful, you'll receive an empty response error: + +[source,shell] +------------------------------------------------------------------------------ +* Rebuilt URL to: https://logs.mycompany.com:5044/ +* Trying 192.168.99.100... +* Connected to logs.mycompany.com (192.168.99.100) port 5044 (#0) +* TLS 1.2 connection using TLS_DHE_RSA_WITH_AES_256_CBC_SHA +* Server certificate: logs.mycompany.com +* Server certificate: mycompany.com +> GET / HTTP/1.1 +> Host: logs.mycompany.com:5044 +> User-Agent: curl/7.43.0 +> Accept: */* +> +* Empty reply from server +* Connection #0 to host logs.mycompany.com left intact +curl: (52) Empty reply from server +------------------------------------------------------------------------------ + +The following example uses the IP address rather than the hostname to validate the certificate: + +[source,shell] +------------------------------------------------------------------------------ +curl -v --cacert ca.crt https://192.168.99.100:5044 +------------------------------------------------------------------------------ + +Validation for this test fails because the certificate is not valid for the specified IP address. It's only valid for the `logs.mycompany.com`, the hostname that appears in the Subject field of the certificate. + +[source,shell] +------------------------------------------------------------------------------ +* Rebuilt URL to: https://192.168.99.100:5044/ +* Trying 192.168.99.100... +* Connected to 192.168.99.100 (192.168.99.100) port 5044 (#0) +* WARNING: using IP address, SNI is being disabled by the OS. +* SSL: certificate verification failed (result: 5) +* Closing connection 0 +curl: (51) SSL: certificate verification failed (result: 5) +------------------------------------------------------------------------------ + +See the <> for info about resolving this issue. + +==== Testing the Beats to Logstash Connection + +If you have {beatname_uc} running as a service, first stop the service. Then test your setup by running {beatname_uc} in +the foreground so you can quickly see any errors that occur: + +["source","sh",subs="attributes,callouts"] +------------------------------------------------------------------------------ +{beatname_lc} -c {beatname_lc}.yml -e -v +------------------------------------------------------------------------------ + +Any errors will be printed to the console. See the <> for info about +resolving common errors. + diff --git a/packetbeat/docs/index.asciidoc b/packetbeat/docs/index.asciidoc index 9916ed02b32..35ff108e8a6 100644 --- a/packetbeat/docs/index.asciidoc +++ b/packetbeat/docs/index.asciidoc @@ -16,14 +16,18 @@ include::./configuring-logstash.asciidoc[] include::../../libbeat/docs/shared-env-vars.asciidoc[] -include::../../libbeat/docs/https.asciidoc[] - include::./capturing.asciidoc[] include::./thrift.asciidoc[] include::./maintaining-topology.asciidoc[] +include::./securing-packetbeat.asciidoc[] + +include::../../libbeat/docs/https.asciidoc[] + +include::../../libbeat/docs/shared-tls-logstash-config.asciidoc[] + include::./visualizing-data-packetbeat.asciidoc[] include::./filtering.asciidoc[] diff --git a/packetbeat/docs/securing-packetbeat.asciidoc b/packetbeat/docs/securing-packetbeat.asciidoc new file mode 100644 index 00000000000..7be9dc49113 --- /dev/null +++ b/packetbeat/docs/securing-packetbeat.asciidoc @@ -0,0 +1,7 @@ +[[securing-packetbeat]] +== Securing Packetbeat + +The following topics describe how to secure communication between Packetbeat and other products in the Elastic stack: + +* <> +* <> diff --git a/topbeat/docs/index.asciidoc b/topbeat/docs/index.asciidoc index 54351f444a9..c6b12fd355a 100644 --- a/topbeat/docs/index.asciidoc +++ b/topbeat/docs/index.asciidoc @@ -16,8 +16,12 @@ include::./configuring-logstash.asciidoc[] include::../../libbeat/docs/shared-env-vars.asciidoc[] +include::./securing-topbeat.asciidoc[] + include::../../libbeat/docs/https.asciidoc[] +include::../../libbeat/docs/shared-tls-logstash-config.asciidoc[] + include::./troubleshooting.asciidoc[] include::./faq.asciidoc[] diff --git a/topbeat/docs/securing-topbeat.asciidoc b/topbeat/docs/securing-topbeat.asciidoc new file mode 100644 index 00000000000..b82faccd72a --- /dev/null +++ b/topbeat/docs/securing-topbeat.asciidoc @@ -0,0 +1,7 @@ +[[securing-topbeat]] +== Securing Topbeat + +The following topics describe how to secure communication between Topbeat and other products in the Elastic stack: + +* <> +* <> \ No newline at end of file diff --git a/winlogbeat/docs/index.asciidoc b/winlogbeat/docs/index.asciidoc index b83eb579535..1b52d341f7d 100644 --- a/winlogbeat/docs/index.asciidoc +++ b/winlogbeat/docs/index.asciidoc @@ -14,8 +14,12 @@ include::./configuring-howto.asciidoc[] include::../../libbeat/docs/shared-env-vars.asciidoc[] +include::./securing-winlogbeat.asciidoc[] + include::../../libbeat/docs/https.asciidoc[] +include::../../libbeat/docs/shared-tls-logstash-config.asciidoc[] + include::./troubleshooting.asciidoc[] include::./faq.asciidoc[] diff --git a/winlogbeat/docs/securing-winlogbeat.asciidoc b/winlogbeat/docs/securing-winlogbeat.asciidoc new file mode 100644 index 00000000000..6b290e9c5bc --- /dev/null +++ b/winlogbeat/docs/securing-winlogbeat.asciidoc @@ -0,0 +1,7 @@ +[[securing-winlogbeat]] +== Securing Winlogbeat + +The following topics describe how to secure communication between Winlogbeat and other products in the Elastic stack: + +* <> +* <> \ No newline at end of file From 301aaca60f83497157ba9b4529170c28eef60693 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Tue, 19 Apr 2016 14:48:04 +0200 Subject: [PATCH 6/6] Backport of #1375 (#1418) A direct commit backport was not possible as too many things changed in the prospector and harvester --- CHANGELOG.asciidoc | 3 + filebeat/beat/publish_test.go | 2 +- filebeat/crawler/crawler.go | 8 +- filebeat/crawler/prospector.go | 16 +- filebeat/crawler/registrar.go | 61 +++-- filebeat/harvester/harvester.go | 4 +- filebeat/harvester/harvester_test.go | 2 +- filebeat/harvester/log.go | 49 +++- filebeat/input/file.go | 6 +- filebeat/tests/system/filebeat.py | 22 ++ filebeat/tests/system/test_prospector.py | 4 +- filebeat/tests/system/test_registrar.py | 285 +++++++++++++++++------ 12 files changed, 356 insertions(+), 106 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 09f7dc98db6..449a3a569a2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -82,6 +82,9 @@ https://github.com/elastic/beats/compare/v1.2.0...v1.2.1[View commits] - Fixed name of the setting `stats.proc` to `stats.process` in the default configuration file. {pull}1343[1343] - Fix issue with cpu.system_p being greater than 1 on Windows {pull}1128[1128] +*Filebeat* +- Improvements in registrar dealing with file rotation. {pull}1281[1281] + ==== Added *Topbeat* diff --git a/filebeat/beat/publish_test.go b/filebeat/beat/publish_test.go index 29eb865afa4..ee201ca3ef0 100644 --- a/filebeat/beat/publish_test.go +++ b/filebeat/beat/publish_test.go @@ -20,7 +20,7 @@ func makeEvents(name string, n int) []*input.FileEvent { DocumentType: "log", Bytes: 100, Offset: int64(i), - Source: &name, + Source: name, } events = append(events, event) } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 16121469ff9..f2d9eb3eaf5 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -58,7 +58,7 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i logp.Debug("prospector", "Waiting for %d prospectors to initialise", pendingProspectorCnt) for event := range crawler.Registrar.Persist { - if event.Source == nil { + if event.Source == "" { pendingProspectorCnt-- if pendingProspectorCnt == 0 { @@ -67,15 +67,15 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i } continue } - crawler.Registrar.State[*event.Source] = event - logp.Debug("prospector", "Registrar will re-save state for %s", *event.Source) + crawler.Registrar.state[event.Source] = event + logp.Debug("prospector", "Registrar will re-save state for %s", event.Source) if !crawler.running { break } } - logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.State)) + logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.getStateCopy())) } func (crawler *Crawler) Stop() { diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index d605ce55648..dad5b8f3feb 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -166,7 +166,7 @@ func (p *Prospector) logRun(spoolChan chan *input.FileEvent) { // This signals we finished considering the previous state event := &input.FileState{ - Source: nil, + Source: "", } p.registrar.Persist <- event @@ -214,7 +214,7 @@ func (p *Prospector) stdinRun(spoolChan chan *input.FileEvent) { // This signals we finished considering the previous state event := &input.FileState{ - Source: nil, + Source: "", } p.registrar.Persist <- event @@ -344,7 +344,7 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp if resuming { logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", file) - h.Offset = offset + h.SetOffset(offset) h.Start() } else { // Old file, skip it, but push offset of file size so we start from the end if this file changes and needs picking up @@ -354,11 +354,13 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp file) newinfo.Skip(newinfo.Fileinfo.Size()) } + p.registrar.Persist <- h.GetState() } else if previousFile, err := p.getPreviousFile(file, newinfo.Fileinfo); err == nil { // This file was simply renamed (known inode+dev) - link the same harvester channel as the old file logp.Debug("prospector", "File rename was detected: %s -> %s", previousFile, file) lastinfo := p.prospectorList[previousFile] newinfo.Continue(&lastinfo) + p.registrar.Persist <- h.GetState() } else { // Are we resuming a file or is this a completely new file? @@ -369,8 +371,9 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp } // Launch the harvester - h.Offset = offset + h.SetOffset(offset) h.Start() + p.registrar.Persist <- h.GetState() } } @@ -402,6 +405,7 @@ func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *inp lastinfo := p.prospectorList[previousFile] newinfo.Continue(&lastinfo) + p.registrar.Persist <- h.GetState() } else { // File is not the same file we saw previously, it must have rotated and is a new file logp.Debug("prospector", "Launching harvester on rotated file: %s", file) @@ -411,6 +415,7 @@ func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *inp // Start a new harvester on the path h.Start() + p.registrar.Persist <- h.GetState() } // Keep the old file in missingFiles so we don't rescan it if it was renamed and we've not yet reached the new filename @@ -423,8 +428,9 @@ func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *inp // Start a harvester on the path; an old file was just modified and it doesn't have a harvester // The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel - h.Offset = <-newinfo.Return + h.SetOffset(<-newinfo.Return) h.Start() + p.registrar.Persist <- h.GetState() } else { logp.Debug("prospector", "Not harvesting, file didn't change: %s", file) } diff --git a/filebeat/crawler/registrar.go b/filebeat/crawler/registrar.go index eb87dec6c98..bda6d69b09b 100644 --- a/filebeat/crawler/registrar.go +++ b/filebeat/crawler/registrar.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input" @@ -16,7 +17,8 @@ type Registrar struct { // Path to the Registry File registryFile string // Map with all file paths inside and the corresponding state - State map[string]*FileState + state map[string]*FileState + stateMutex sync.Mutex // Channel used by the prospector and crawler to send FileStates to be persisted Persist chan *input.FileState running bool @@ -39,7 +41,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) { func (r *Registrar) Init() error { // Init state r.Persist = make(chan *FileState) - r.State = make(map[string]*FileState) + r.state = map[string]*FileState{} r.Channel = make(chan []*FileEvent, 1) // Set to default in case it is not set @@ -71,11 +73,13 @@ func (r *Registrar) Init() error { // loadState fetches the previous reading state from the configure RegistryFile file // The default file is .filebeat file which is stored in the same path as the binary is running func (r *Registrar) LoadState() { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() if existing, e := os.Open(r.registryFile); e == nil { defer existing.Close() logp.Info("Loading registrar data from %s", r.registryFile) decoder := json.NewDecoder(existing) - decoder.Decode(&r.State) + decoder.Decode(&r.state) } } @@ -94,8 +98,9 @@ func (r *Registrar) Run() { return // Treats new log files to persist with higher priority then new events case state := <-r.Persist: - r.State[*state.Source] = state - logp.Debug("prospector", "Registrar will re-save state for %s", *state.Source) + source := state.Source + r.setState(source, state) + logp.Debug("prospector", "Registrar will re-save state for %s", source) case events := <-r.Channel: r.processEvents(events) } @@ -121,7 +126,7 @@ func (r *Registrar) processEvents(events []*FileEvent) { continue } - r.State[*event.Source] = event.GetState() + r.setState(event.Source, event.GetState()) } } @@ -133,7 +138,7 @@ func (r *Registrar) Stop() { } func (r *Registrar) GetFileState(path string) (*FileState, bool) { - state, exist := r.State[path] + state, exist := r.getState(path) return state, exist } @@ -149,12 +154,13 @@ func (r *Registrar) writeRegistry() error { } encoder := json.NewEncoder(file) - encoder.Encode(r.State) + state := r.getStateCopy() + encoder.Encode(state) // Directly close file because of windows file.Close() - logp.Info("Registry file updated. %d states written.", len(r.State)) + logp.Info("Registry file updated. %d states written.", len(state)) return SafeFileRotate(r.registryFile, tempfile) } @@ -168,7 +174,6 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo logp.Debug("registar", "Same file as before found. Fetch the state and persist it.") // We're resuming - throw the last state back downstream so we resave it // And return the offset - also force harvest in case the file is old and we're about to skip it - r.Persist <- lastState return lastState.Offset, true } @@ -179,8 +184,7 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath) lastState, _ := r.GetFileState(previous) - lastState.Source = &filePath - r.Persist <- lastState + r.updateStateSource(lastState, filePath) return lastState.Offset, true } @@ -198,7 +202,7 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) newState := input.GetOSFileState(&newFileInfo) - for oldFilePath, oldState := range r.State { + for oldFilePath, oldState := range r.getStateCopy() { // Skipping when path the same if oldFilePath == newFilePath { @@ -214,3 +218,34 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) return "", fmt.Errorf("No previous file found") } + +func (r *Registrar) setState(path string, state *FileState) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + r.state[path] = state +} + +func (r *Registrar) getState(path string) (*FileState, bool) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + state, exist := r.state[path] + return state, exist +} + +func (r *Registrar) updateStateSource(state *FileState, path string) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + state.Source = path +} + +func (r *Registrar) getStateCopy() map[string]FileState { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + + copy := make(map[string]FileState) + for k, v := range r.state { + copy[k] = *v + } + + return copy +} diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 842de098128..2c97b524979 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -17,6 +17,7 @@ import ( "io" "os" "regexp" + "sync" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" @@ -27,7 +28,8 @@ type Harvester struct { Path string /* the file path to harvest */ ProspectorConfig config.ProspectorConfig Config *config.HarvesterConfig - Offset int64 + offset int64 + offsetLock sync.Mutex Stat *FileStat SpoolerChan chan *input.FileEvent encoding encoding.EncodingFactory diff --git a/filebeat/harvester/harvester_test.go b/filebeat/harvester/harvester_test.go index a1e5011af7e..ac1dcecc819 100644 --- a/filebeat/harvester/harvester_test.go +++ b/filebeat/harvester/harvester_test.go @@ -13,7 +13,7 @@ func TestExampleTest(t *testing.T) { h := Harvester{ Path: "/var/log/", - Offset: 0, + offset: 0, } assert.Equal(t, "/var/log/", h.Path) diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 347f3fdb44d..bdcb871f786 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -88,7 +88,7 @@ func createLineReader( func (h *Harvester) Harvest() { defer func() { // On completion, push offset so we can continue where we left off if we relaunch on the same file - h.Stat.Return <- h.Offset + h.Stat.Return <- h.GetOffset() // Make sure file is closed as soon as harvester exits // If file was never properly opened, it can't be closed @@ -149,8 +149,8 @@ func (h *Harvester) Harvest() { logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) - h.Offset = 0 - seeker.Seek(h.Offset, os.SEEK_SET) + h.SetOffset(0) + seeker.Seek(h.GetOffset(), os.SEEK_SET) continue } @@ -162,10 +162,10 @@ func (h *Harvester) Harvest() { // Sends text to spooler event := &input.FileEvent{ ReadTime: ts, - Source: &h.Path, + Source: h.Path, InputType: h.Config.InputType, DocumentType: h.Config.DocumentType, - Offset: h.Offset, + Offset: h.GetOffset(), Bytes: bytesRead, Text: &text, Fields: &h.Config.Fields, @@ -177,7 +177,7 @@ func (h *Harvester) Harvest() { } // Set Offset - h.Offset += int64(bytesRead) // Update offset if complete line has been processed + h.SetOffset(h.GetOffset() + int64(bytesRead)) // Update offset if complete line has been processed } } @@ -263,24 +263,25 @@ func (h *Harvester) openFile() (encoding.Encoding, error) { func (h *Harvester) initFileOffset(file *os.File) error { offset, err := file.Seek(0, os.SEEK_CUR) - if h.Offset > 0 { + if h.GetOffset() > 0 { // continue from last known offset logp.Debug("harvester", - "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.Offset, offset) - _, err = file.Seek(h.Offset, os.SEEK_SET) + "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.GetOffset(), offset) + _, err = file.Seek(h.GetOffset(), os.SEEK_SET) } else if h.Config.TailFiles { // tail file if file is new and tail_files config is set logp.Debug("harvester", "harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset) - h.Offset, err = file.Seek(0, os.SEEK_END) + offset, err = file.Seek(0, os.SEEK_END) + h.SetOffset(offset) } else { // get offset from file in case of encoding factory was // required to read some data. logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset) - h.Offset = offset + h.SetOffset(offset) } return err @@ -290,3 +291,29 @@ func (h *Harvester) Stop() { } const maxConsecutiveEmptyReads = 100 + +// GetState returns current state of harvester +func (h *Harvester) GetState() *input.FileState { + + state := input.FileState{ + Source: h.Path, + Offset: h.GetOffset(), + FileStateOS: input.GetOSFileState(&h.Stat.Fileinfo), + } + + return &state +} + +func (h *Harvester) SetOffset(offset int64) { + h.offsetLock.Lock() + defer h.offsetLock.Unlock() + + h.offset = offset +} + +func (h *Harvester) GetOffset() int64 { + h.offsetLock.Lock() + defer h.offsetLock.Unlock() + + return h.offset +} diff --git a/filebeat/input/file.go b/filebeat/input/file.go index 559dc9f162a..655983c8441 100644 --- a/filebeat/input/file.go +++ b/filebeat/input/file.go @@ -18,7 +18,7 @@ type File struct { // FileEvent is sent to the output and must contain all relevant information type FileEvent struct { ReadTime time.Time - Source *string + Source string InputType string DocumentType string Offset int64 @@ -31,8 +31,8 @@ type FileEvent struct { } type FileState struct { - Source *string `json:"source,omitempty"` - Offset int64 `json:"offset,omitempty"` + Source string `json:"source,omitempty"` + Offset int64 `json:"offset,omitempty"` FileStateOS *FileStateOS } diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index 2387771678a..2535cdb8a0a 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -272,3 +272,25 @@ def get_dot_filebeat(self): with open(dotFilebeat) as file: return json.load(file) + + + def log_contains_count(self, msg, logfile=None): + """ + Returns the number of appearances of the given string in the log file + """ + + counter = 0 + + # Init defaults + if logfile is None: + logfile = "filebeat.log" + + try: + with open(os.path.join(self.working_dir, logfile), "r") as f: + for line in f: + if line.find(msg) >= 0: + counter = counter + 1 + except IOError: + counter = -1 + + return counter diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index 0094a8515c9..0d38c56ad56 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -110,10 +110,10 @@ def test_stdin(self): objs = self.read_output() assert len(objs) == iterations1+iterations2 - def test_rotating_ignore_older_larger_write_rate(self): + def test_rotating_close_older_larger_write_rate(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - ignoreOlder="1s", + ignoreOlder="10s", closeOlder="1s", scan_frequency="0.1s", ) diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 6381149001f..c8aade9a0ed 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -1,6 +1,11 @@ from filebeat import TestCase import os +import platform +import time +import shutil +from nose.plugins.skip import Skip, SkipTest + # Additional tests: to be implemented # * Check if registrar file can be configured, set config param @@ -16,69 +21,71 @@ def test_registrar_file_content(self): """ self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*" + path=os.path.abspath(self.working_dir) + "/log/*" ) os.mkdir(self.working_dir + "/log/") + # Use \n as line terminator on all platforms per docs. + line = "hello world\n" + line_len = len(line) - 1 + len(os.linesep) + iterations = 5 testfile = self.working_dir + "/log/test.log" file = open(testfile, 'w') - - iterations = 5 - file.write(iterations * "hello world\n") - + file.write(iterations * line) file.close() filebeat = self.start_filebeat() + c = self.log_contains_count("states written") + + self.wait_until( + lambda: self.log_contains( + "Processing 5 events"), + max_timeout=15) + # Make sure states written appears one more time self.wait_until( - lambda: self.log_contains( - "Processing 5 events"), - max_timeout=15) + lambda: self.log_contains("states written") > c, + max_timeout=10) # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - ".filebeat")), - max_timeout=1) + lambda: os.path.isfile(os.path.join(self.working_dir, + ".filebeat")), + max_timeout=1) filebeat.kill_and_wait() - # Check that file exist + # Check that a single file exists in the registry. data = self.get_dot_filebeat() - - # Check that offset is set correctly - logFileAbs = os.path.abspath(testfile) - # Hello world text plus newline, multiplied by the number - # of lines and the windows correction applied - assert data[logFileAbs]['offset'] == \ - iterations * (11 + len(os.linesep)) - - # Check that right source field is inside - assert data[logFileAbs]['source'] == logFileAbs - - # Check that no additional info is in the file assert len(data) == 1 - # Windows checks - if os.name == "nt": - - # TODO: Check for IdxHi, IdxLo, Vol + logFileAbsPath = os.path.abspath(testfile) + record = data[logFileAbsPath] - assert len(data[logFileAbs]) == 3 - assert len(data[logFileAbs]['FileStateOS']) == 3 + self.assertDictContainsSubset({ + "source": logFileAbsPath, + "offset": iterations * line_len, + }, record) + self.assertTrue("FileStateOS" in record) + file_state_os = record["FileStateOS"] + if os.name == "nt": + # Windows checks + # TODO: Check for IdxHi, IdxLo, Vol in FileStateOS on Windows. + self.assertEqual(len(file_state_os), 3) + elif platform.system() == "SunOS": + stat = os.stat(logFileAbsPath) + self.assertEqual(file_state_os["inode"], stat.st_ino) + + # Python does not return the same st_dev value as Golang or the + # command line stat tool so just check that it's present. + self.assertTrue("device" in file_state_os) else: - # Check that inode is set correctly - inode = os.stat(logFileAbs).st_ino - assert data[logFileAbs]['FileStateOS']['inode'] == inode - - # Check that device is set correctly - device = os.stat(logFileAbs).st_dev - assert os.name == "nt" or \ - data[logFileAbs]['FileStateOS']['device'] == device - - assert len(data[logFileAbs]) == 3 - assert len(data[logFileAbs]['FileStateOS']) == 2 + stat = os.stat(logFileAbsPath) + self.assertDictContainsSubset({ + "inode": stat.st_ino, + "device": stat.st_dev, + }, file_state_os) def test_registrar_files(self): """ @@ -86,7 +93,7 @@ def test_registrar_files(self): """ self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*" + path=os.path.abspath(self.working_dir) + "/log/*" ) os.mkdir(self.working_dir + "/log/") @@ -108,15 +115,15 @@ def test_registrar_files(self): filebeat = self.start_filebeat() self.wait_until( - lambda: self.log_contains( - "Processing 10 events"), - max_timeout=15) + lambda: self.log_contains( + "Processing 10 events"), + max_timeout=15) # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - ".filebeat")), - max_timeout=1) + lambda: os.path.isfile(os.path.join(self.working_dir, + ".filebeat")), + max_timeout=1) filebeat.kill_and_wait() # Check that file exist @@ -131,8 +138,8 @@ def test_custom_registry_file_location(self): is created automatically. """ self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="a/b/c/registry", + path=os.path.abspath(self.working_dir) + "/log/*", + registryFile="a/b/c/registry", ) os.mkdir(self.working_dir + "/log/") testfile = self.working_dir + "/log/test.log" @@ -140,21 +147,20 @@ def test_custom_registry_file_location(self): f.write("hello world\n") filebeat = self.start_filebeat() self.wait_until( - lambda: self.log_contains( - "Processing 1 events"), - max_timeout=15) + lambda: self.log_contains( + "Processing 1 events"), + max_timeout=15) # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "a/b/c/registry")), + lambda: os.path.isfile(os.path.join(self.working_dir, + "a/b/c/registry")), - max_timeout=1) + max_timeout=1) filebeat.kill_and_wait() assert os.path.isfile(os.path.join(self.working_dir, "a/b/c/registry")) - def test_rotating_file(self): """ Checks that the registry is properly updated after a file is rotated @@ -171,10 +177,8 @@ def test_rotating_file(self): with open(testfile, 'w') as f: f.write("offset 9\n") - self.wait_until( - lambda: self.output_has(lines=1), - max_timeout=10) - + self.wait_until(lambda: self.output_has(lines=1), + max_timeout=10) testfilerenamed = self.working_dir + "/log/test.1.log" os.rename(testfile, testfilerenamed) @@ -182,10 +186,8 @@ def test_rotating_file(self): with open(testfile, 'w') as f: f.write("offset 10\n") - - self.wait_until( - lambda: self.output_has(lines=2), - max_timeout=10) + self.wait_until(lambda: self.output_has(lines=2), + max_timeout=10) filebeat.kill_and_wait() @@ -198,3 +200,156 @@ def test_rotating_file(self): # Check that 2 files are port of the registrar file assert len(data) == 2 + + def test_rotating_file_inode(self): + """ + Check that inodes are properly written during file rotation + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + scan_frequency="1s" + ) + + if os.name == "nt": + raise SkipTest + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/input" + + filebeat = self.start_filebeat() + + with open(testfile, 'w') as f: + f.write("entry1\n") + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + data = self.get_dot_filebeat() + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + + testfilerenamed1 = self.working_dir + "/log/input.1" + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("entry2\n") + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_dot_filebeat() + + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Rotate log file, create a new empty one and remove it afterwards + testfilerenamed2 = self.working_dir + "/log/input.2" + os.rename(testfilerenamed1, testfilerenamed2) + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("") + + os.remove(testfilerenamed2) + + with open(testfile, 'w') as f: + f.write("entry3\n") + + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + + filebeat.kill_and_wait() + + data = self.get_dot_filebeat() + + # Compare file inodes and the one in the registry + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Check that 2 files are part of the registrar file. The deleted file should never have been detected + assert len(data) == 2 + + + def test_rotating_file_with_shutdown(self): + """ + Check that inodes are properly written during file rotation and shutdown + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + scan_frequency="1s" + ) + + if os.name == "nt": + raise SkipTest + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/input" + + filebeat = self.start_filebeat() + + with open(testfile, 'w') as f: + f.write("entry1\n") + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + data = self.get_dot_filebeat() + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + + testfilerenamed1 = self.working_dir + "/log/input.1" + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("entry2\n") + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_dot_filebeat() + + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + filebeat.kill_and_wait() + + # Store first registry file + shutil.copyfile(self.working_dir + "/.filebeat", self.working_dir + "/.filebeat.first") + + # Rotate log file, create a new empty one and remove it afterwards + testfilerenamed2 = self.working_dir + "/log/input.2" + os.rename(testfilerenamed1, testfilerenamed2) + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("") + + os.remove(testfilerenamed2) + + with open(testfile, 'w') as f: + f.write("entry3\n") + + filebeat = self.start_filebeat(output="filebeat2.log") + + # Output file was rotated + self.wait_until( + lambda: self.output_has(lines=2, output_file="output/filebeat.1"), + max_timeout=10) + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + filebeat.kill_and_wait() + + data = self.get_dot_filebeat() + + # Compare file inodes and the one in the registry + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Check that 2 files are part of the registrar file. The deleted file should never have been detected + assert len(data) == 2