Switch branches/tags
Nothing to show
Find file History
Permalink
..
Failed to load latest commit information.
img Add files via upload Nov 15, 2018
ELK:elastalert报警系统.md Update ELK:elastalert报警系统.md Nov 8, 2018
ELK:使用进阶指南.md Update ELK:使用进阶指南.md Nov 8, 2018
ELK:在企业中的落地.md Create ELK:在企业中的落地.md Nov 8, 2018
ELK:安全项目Search Guard.md Create ELK:安全项目Search Guard.md Nov 13, 2018
ELK:安装与基础使用.md Update ELK:安装与基础使用.md Nov 7, 2018
ELK:插件记录.md Create ELK:插件记录.md Nov 7, 2018
ELK:踩坑记.md Create ELK:踩坑记.md Nov 7, 2018
Graylog:使用进阶指南.md Update Graylog:使用进阶指南.md Nov 12, 2018
Graylog:安装与基础使用.md Update Graylog:安装与基础使用.md Nov 12, 2018
Kafka:分布式流处理系统.md Update Kafka:分布式流处理系统.md Nov 14, 2018
Linux日志分析工具:logwatch.md Rename logwatch:Linux日志分析工具.md to Linux日志分析工具:logwatch.md Nov 8, 2018
README.md Update README.md Nov 8, 2018
Splunk:安装与基础使用.md Create Splunk:安装与基础使用.md Nov 9, 2018
Zookeeper:分布式协调服务.md Update Zookeeper:分布式协调服务.md Nov 15, 2018
日志分析:可视化展示.md Create 日志分析:可视化展示.md Nov 9, 2018
日志分析:日志处理.md Rename 日志收集:Flume.md to 日志分析:日志处理.md Nov 9, 2018
日志分析:日志存储.md Update 日志分析:日志存储.md Nov 13, 2018
日志分析:日志收集.md Update 日志分析:日志收集.md Nov 14, 2018

README.md

概述

日志分析的几个板块

  • 数据源
各类型的日志
  • 数据采集
Rsyslog
Flume
Beats
Logstash
  • 数据处理
Logstash
Spark
Storm
  • 数据存储
ElasticSearch
Hive
HBase
  • 可视化展示
Kibana
Graylog

企业自建日志分析平台

日志分析平台架构

这里我说一下我自己用到的架构:

log-1

配置文件如下:

  • Ryslog Client
# 导入模块
module(load="imfile" mode="inotify")

# 设置工作目录
$WorkDirectory /var/lib/rsyslog

$ActionQueueWorkerThreads 1
$ActionQueueDequeueBatchSize 30
$ActionQueueDequeueSlowdown 300
$ActionQueueDiscardSeverity 8
$ActionQueueFileName action_queue_buffer_bitun_nginx_access
$ActionQueueMaxDiskSpace 10m
$ActionQueueSaveOnShutdown on

$MaxMessageSize 64k

input(type="imfile"
    File="log-path"
    Tag="tag_name"
    Severity="info"
    PersistStateInterval="20000"
    reopenOnTruncate="on"
    )

if $programname == 'tag_name' then @@Rsyslog Server IP:Port
if $programname == 'tag_name' then stop
  • Ryslog Server
module(load="omkafka")
$SystemLogSocketFlowControl on #如果队列满了,会暂停一会等待队列消耗
$SystemLogRateLimitInterval 0 #default 0 速率限制,改为0不限制
$ActionQueueType LinkedList   # use asynchronous processing
$ActionQueueFileName backup_local # set file name, also enables disk mode
$ActionResumeRetryCount -1    # infinite retries on insert failure
$ActionQueueSaveOnShutdown on
$ActionQueueMaxFileSize 500m
$ActionQueueMaxDiskSpace 10g

$template l7_msg,"%msg:1:$%"

:rawmsg, contains, "tag_name" action(
    type="omkafka"
    broker="Kafka Server IP:Port(默认9092)"
    topic="topic_name"
    partitions.number="10"
    confParam=[
        "socket.keepalive.enable=true"
    ]
    template="l7_msg"
    action.resumeretrycount="1"
)
  • kafka:创建好topic即可

  • Logstash

input {
    kafka {
        bootstrap_servers => ["Kakfa Server IP:Port(默认9092)"]
        topics => ["topic_name"]
        type => "httpd"
    }
}
output {
    elasticsearch {
        hosts => ["ElasticSearch Server IP:Port(默认9200)"]
        index => "httpd-%{+YYYY.MM}"
    }
}

参考资料

几十条业务线日志系统如何收集处理?