Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken hadoop_snappy compression in some cases #4470

Open
alheio opened this issue Apr 17, 2024 · 1 comment
Open

Broken hadoop_snappy compression in some cases #4470

alheio opened this issue Apr 17, 2024 · 1 comment

Comments

@alheio
Copy link

alheio commented Apr 17, 2024

Describe the bug

Hello!

I have logs from two apps.

app1 - input in_tail one file, webhdfs output with file buffer and hadoop_snappy compression
app2 - input in_tail files by mask, webhdfs output with file buffer and hadoop_snappy compression

app1 is watching on 1 file, buffer stores chunks like 20mb per 2 minutes and flush chunk into hdfs by time
app2 is watching on 50 files, buffer stores like 50mb per 5 minutes and flush data into hdfs by time

App1 works fine, but in case of app2 Im getting "invalid compression" on like 5% of files (chunks) from different hosts while processing files.

Exception in thread "main" java.lang.InternalError: Could not decompress data. Input is invalid.
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressBytesDirect(Native Method)
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:239)
        at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
        at java.io.InputStream.read(InputStream.java:101)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:93)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:61)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:121)
        at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:106)
        at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:101)
        at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:317)
        at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:289)
        at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:271)
        at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:255)
        at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:118)
        at org.apache.hadoop.fs.shell.Command.run(Command.java:165)
        at org.apache.hadoop.fs.FsShell.run(FsShell.java:315)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at org.apache.hadoop.fs.FsShell.main(FsShell.java:372)

How can i tune config? Why hadoop_snappy generates invalid blocks?
If i set compression "text" instead of "hadoop_snappy", all works fine, and no invalid records while parsing data in hdfs.

To Reproduce

To reproduce the issue, you could do what i did in "describe bug" section with fillowing dockerfile:

FROM fluent/fluentd:v1.16.5-debian-amd64-1.0

USER root

# Timezone
ENV TZ="Europe/Moscow"
RUN ln -snf "/usr/share/zoneinfo/$TZ" "/etc/localtime" \
    && echo "$TZ" > "/etc/timezone"

# for snappy gem native libs building
RUN apt update \
    && apt -y install build-essential autoconf automake libtool libsnappy-dev \
    && apt clean

# plugins
RUN fluent-gem install \
    fluent-plugin-webhdfs \
    fluent-plugin-prometheus \
    snappy

USER fluent

Expected behavior

Expected behaviour - valid compression on result files.

Your Environment

- Official fluentd docker image fluent/fluentd:v1.16.5-debian-amd64-1.0
- Fluentd version: 1.16.5
- gem 'fluent-plugin-prometheus' version '2.1.0'
- gem 'fluent-plugin-webhdfs' version '1.6.0'

Your Configuration

<system>
    workers 2
  </system>

  <source>
    @type monitor_agent
    bind "0.0.0.0"
    port 24220
  </source>

  <source>
    @type prometheus
    bind "0.0.0.0"
    port 24231
    metrics_path "/metrics"
  </source>

  <filter mytags.*>
    @type prometheus
    <metric>
      name fluentd_input_status_num_records_total
      type counter
      desc The total number of incoming records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </filter>

  <source>
    @type prometheus_monitor
  </source>

  <source>
    @type prometheus_output_monitor
    interval 10
    <labels>
      hostname ${hostname}
    </labels>
  </source>

  <worker 0>
    <source>
      tag "mytags.app-tag-1"
      @type tail
      path "/path/to/logs/app1.log"
      pos_file "/home/fluentd/pipeline_workdir/app1.log.pos"
      read_from_head false
      <parse>
        @type "none"
        unmatched_lines
      </parse>
    </source>
    <match mytags.app-tag-1>
      @type webhdfs
      namenode namenode1:50070
      standby_namenode namenode2:50070
      ignore_start_check_error true
      retry_known_errors yes
      retry_times 10000
      retry_interval 30
      open_timeout 180
      read_timeout 180
      append no
      username hdfsuser
      path "/path/in/hdfs/app1/%Y/%m/%d/%H/#{Socket.gethostname}.${chunk_id}.log"
      compress hadoop_snappy
      <format>
        @type "single_value"
      </format>
      <buffer time>
        @type "file"
        path "/home/fluentd/pipeline_workdir/app1.log.buf"
        chunk_limit_size 512MB
        flush_mode interval
        flush_interval 2m
        flush_thread_count 5
        retry_type periodic
        retry_wait 30s
        timekey_zone "+0300"
        timekey 3600
      </buffer>
    </match>
  </worker>

  <worker 1>
    <source>
      tag "mytags.app2"
      @type tail
      path "/path2/to/logs/app2*.log"
      pos_file "/home/fluentd/pipeline_workdir/app2.log.pos"
      read_from_head false
      follow_inodes true
      rotate_wait 60
      <parse>
        @type "none"
        unmatched_lines
      </parse>
    </source>
    <match mytags.app2>
      @type webhdfs
      namenode namenode1:50070
      standby_namenode namenode2:50070
      ignore_start_check_error true
      retry_known_errors yes
      retry_times 10000
      retry_interval 30
      open_timeout 180
      read_timeout 180
      append no
      username hdfsuser
      path "/path/in/hdfs/app2/%Y/%m/%d/%H/#{Socket.gethostname}.${chunk_id}.log"
      compress hadoop_snappy
      <format>
        @type "single_value"
      </format>
      <buffer time>
        @type "file"
        path "/home/fluentd/pipeline_workdir/app2.log.buf"
        chunk_limit_size 512MB
        flush_mode interval
        flush_interval 2m
        flush_thread_count 5
        retry_type periodic
        retry_wait 30s
        timekey_zone "+0300"
        timekey 3600
      </buffer>
    </match>
  </worker>

Your Error Log

no errors on fluentd side

Additional context

The only difference between two pipelines is in_tail watching on 50 files by mask.

@alheio
Copy link
Author

alheio commented Apr 18, 2024

If i place both pipelines on one worker, im getting "invalid snappy compression" errors on both pipelines (only on the first pipeline, if second pipeline has "text" compression codec)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant