Skip to content

Cloud Pipeline Log Storage

Pavel Silin edited this page Apr 10, 2020 · 15 revisions

GitHub issue: https://github.com/epam/cloud-pipeline/issues/989

Manual setup of Log storage infrastructure

This Tutorial goes through a process of manual setup infrastructure for logging store feature (for cloud-pipeline version: c6bdd60).

Main changes:

Custom docker for cp-search-elk pod

FROM elasticsearch:6.8.3
RUN bin/elasticsearch-plugin install repositiry-s3

RUN wget https://packages.elastic.co/curator/5/centos/7/Packages/elasticsearch-curator-5.8.1-1.x86_64.rpm && \
    rpm -vi elasticsearch-curator-5.8.1-1.x86_64.rpm && \
    rm elasticsearch-curator-5.8.1-1.x86_64.rpm && mkdir ~/.curator
RUN yum update -y && yum install cronie gettext -q -y

ADD curator.yml ~/.curator/curator.yml
ADD curator-actions.yml ~/.curator/curator-actions.yml

ADD get-aws-profile.sh /usr/local/bin/get-aws-profile
RUN chmod +x /usr/local/bin/get-aws-profile

ADD init.sh /init.sh
RUN chmod +x /init.sh

curator.yml

client:
  hosts:
    - 127.0.0.1
  port: 9200
  url_prefix:
  use_ssl: False
  certificate:
  client_cert:
  client_key:
  ssl_no_validate: False
  http_auth:
  timeout: 30
  master_only: False

logging:
  loglevel: DEBUG
  logfile:
  logformat: default
  blacklist: ['elasticsearch', 'urllib3']

curator-actions-template.yml

actions:
  1:
    action: snapshot
    description: >-
      Snapshot security_log- prefixed indices older than 1 hour (based on index
      creation_date).
    options:
      allow_ilm_indices: True
      repository: log_backup_repo
      name: snapshot-%Y%m%d%H
      ignore_unavailable: False
      include_global_state: True
      partial: False
      wait_for_completion: True
      skip_repo_fs_check: False
      disable_action: False
    filters:
    - filtertype: pattern
      kind: prefix
      value: ${CP_SECURITY_LOGS_ELASTIC_PREFIX}
    - filtertype: age
      source: creation_date
      direction: older
      unit: days
      unit_count: 1
  2:
    action: delete_snapshots
    description: >-
      Delete snapshots from the selected repository older than 30 days
      (based on creation_date), for 'snapshot-' prefixed snapshots.
    options:
      repository: log_backup_repo
      disable_action: False
    filters:
    - filtertype: pattern
      kind: prefix
      value: snapshot-
      exclude:
    - filtertype: age
      source: creation_date
      direction: older
      unit: days
      unit_count: ${CP_SECURITY_LOGS_ELASTIC_BACKUP_DAYS}

get-aws-profile.sh

#!/bin/bash

cfg_parser ()
{
  IFS=$'\n' && ini=( $(<$1) ) # convert to line-array
  ini=( ${ini[*]//;*/} )      # remove comments ;
  ini=( ${ini[*]//\#*/} )     # remove comments #
  ini=( ${ini[*]/\  =/=} )  # remove tabs before =
  ini=( ${ini[*]/=\ /=} )   # remove tabs be =
  ini=( ${ini[*]/\ /} )
  ini=( ${ini[*]/\ *=\ /=} )   # remove anything with a space around  =
  ini=( ${ini[*]/#[/\}$'\n'cfg.section.} ) # set section prefix
  ini=( ${ini[*]/%]/ \(} )    # convert text2function (1)
  ini=( ${ini[*]/=/=\( } )    # convert item to array
  ini=( ${ini[*]/%/ \)} )     # close array parenthesis
  ini=( ${ini[*]/%\\ \)/ \\} ) # the multiline trick
  ini=( ${ini[*]/%\( \)/\(\) \{} ) # convert text2function (2)
  ini=( ${ini[*]/%\} \)/\}} ) # remove extra parenthesis
  ini[0]="" # remove first element
  ini[${#ini[*]} + 1]='}'    # add the last brace
  eval "$(echo "${ini[*]}")" # eval the result
}

# echo a message to standard error (used for messages not intended
# to be parsed by scripts, such as usage messages, warnings or errors)
echo_stderr() {
  echo "$@" >&2
}

#
# Parse options
#

display_usage ()
{
  echo_stderr "Usage: $0 [--credentials=<path>] [--profile=<name>] [--key|--secret|--session-token]"
  echo_stderr "  Default --credentials is '~/.aws/credentials'"
  echo_stderr "  Default --profile is 'default'"
  echo_stderr "  By default environment variables are generate, e.g."
  echo_stderr "    source \$($0 --profile=myprofile)"
  echo_stderr "  You can specify one of --key, --secret, -or --session-token to get just that value, with no line break,"
  echo_stderr "    FOO_KEY=\$($0 --profile=myprofile --key)"
  echo_stderr "    FOO_SECRET=\$($0 --profile=myprofile --secret)"
  echo_stderr "    FOO_SESSION_TOKEN=\$($0 --profile=myprofile --session-token)"
}

for i in "$@"
do
case $i in
    --credentials=*)
    CREDENTIALS="${i#*=}"
    shift # past argument=value
    ;;
    --profile=*)
    PROFILE="${i#*=}"
    shift # past argument=value
    ;;
    --key)
    SHOW_KEY=true
    shift # past argument with no value
    ;;
    --secret)
    SHOW_SECRET=true
    shift # past argument with no value
    ;;
    --session-token)
    SHOW_SESSION_TOKEN=true
    shift # past argument with no value
    ;;
    --help)
    display_usage
    exit 0
    ;;
    *)
    # unknown option
    echo "Unknown option $1"
    display_usage
    exit 1
    ;;
esac
done

#
# Check options
#

CREDENTIALS=${CREDENTIALS:-/root/.cloud/credentials}
PROFILE=${PROFILE:-default}
SHOW_KEY=${SHOW_KEY:-false}
SHOW_SECRET=${SHOW_SECRET:-false}
SHOW_SESSION_TOKEN=${SHOW_SESSION_TOKEN:-false}

if [[ "${SHOW_KEY}" = true && "${SHOW_SECRET}" = true ]]; then
  echo_stderr "Can only specify one of --key or --secret"
  display_usage
  exit 2
fi

#
# Parse and display
#

if [[ ! -r "${CREDENTIALS}" ]]; then
  echo_stderr "File not found: '${CREDENTIALS}'"
  exit 3
fi

cfg_parser "${CREDENTIALS}"
if [[ $? -ne 0 ]]; then
  echo_stderr "Parsing credentials file '${CREDENTIALS}' failed"
  exit 4
fi

cfg.section.${PROFILE}
if [[ $? -ne 0 ]]; then
  echo_stderr "Profile '${PROFILE}' not found"
  exit 5
fi

if [[ "${SHOW_KEY}" = false && "${SHOW_SECRET}" = false && "${SHOW_SESSION_TOKEN}" = false ]]; then
  echo "export AWS_ACCESS_KEY_ID=${aws_access_key_id}"
  echo "export AWS_SECRET_ACCESS_KEY=${aws_secret_access_key}"
  echo "export AWS_SESSION_TOKEN=${aws_session_token}"
elif [[ "${SHOW_KEY}" = true ]]; then
    echo -n "${aws_access_key_id}"
elif [[ "${SHOW_SECRET}" = true ]]; then
  echo -n "${aws_secret_access_key}"
elif [[ "${SHOW_SESSION_TOKEN}" = true ]]; then
  echo -n "${aws_session_token}"
else
  echo_stderr "Unknown error"
  exit 9
fi

exit 0

init.sh

echo eval(get-aws-profile --key) | bin/elasticsearch-keystore add s3.client.default.access_key
echo eval(get-aws-profile --secret) | bin/elasticsearch-keystore add s3.client.default.secret_key

ulimit -n 65536 && sysctl -w vm.max_map_count=262144 && /usr/local/bin/docker-entrypoint.sh &

ILM_POLICY="{
  \"policy\": {
    \"phases\": {
      \"hot\": {
        \"actions\": {
          \"rollover\": {
            \"max_age\": \"1d\"
          }
        }
      },
      \"delete\": {
        \"min_age\": \"${CP_SECURITY_LOGS_ROLLOVER_DAYS:-20}d\",
        \"actions\": {
          \"delete\": {}
        }
      }
    }
  }
}"

curl -H 'Content-Type: application/json' -XPUT localhost:9200/_ilm/policy/security_log_policy -d "$ILM_POLICY"

INDEX_TEMPLATE="{
  \"index_patterns\": [\"${CP_SECURITY_LOGS_ELASTIC_PREFIX:-security_log}-*\"],
  \"settings\": {
    \"number_of_shards\": 1,
    \"number_of_replicas\": 0,
    \"index.lifecycle.name\": \"security_log_policy\",
    \"index.lifecycle.rollover_alias\": \"${CP_SECURITY_LOGS_ELASTIC_PREFIX:-security_log}\"
  },
  \"mappings\": {
    \"doc\" : {
      \"properties\": {
        \"@timestamp\": {
          \"type\": \"date\"
        },
        \"event_id\": {
          \"type\": \"long\"
        },
        \"hostname\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"application\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"level\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"loggerName\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"message\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"message_timestamp\": {
          \"type\": \"date\"
        },
        \"service_account\": {
          \"type\": \"boolean\"
        },
        \"service_name\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"source\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"thread\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"thrown\": {
          \"properties\": {
            \"commonElementCount\": {
              \"type\": \"long\"
            },
            \"extendedStackTrace\": {
              \"type\": \"text\",
              \"fields\": {
                \"keyword\": {
                  \"type\": \"keyword\"
                }
              }
            },
            \"localizedMessage\": {
              \"type\": \"text\",
              \"fields\": {
                \"keyword\": {
                  \"type\": \"keyword\"
                }
              }
            },
            \"message\": {
              \"type\": \"text\",
              \"fields\": {
                \"keyword\": {
                  \"type\": \"keyword\"
                }
              }
            },
            \"name\": {
              \"type\": \"text\",
              \"fields\": {
                \"keyword\": {
                  \"type\": \"keyword\"
                }
              }
            }
          }
        },
        \"type\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        },
        \"user\": {
          \"type\": \"text\",
          \"fields\": {
            \"keyword\": {
              \"type\": \"keyword\"
            }
          }
        }
      }
    }
  }
}"

curl -H 'Content-Type: application/json' -XPUT localhost:9200/_template/security_log_template -d "$INDEX_TEMPLATE"

INDEX="{
  \"aliases\": {
    \"${CP_SECURITY_LOGS_ELASTIC_PREFIX:-security_log}\": {}
  }
}"

curl -H 'Content-Type: application/json' -XPUT localhost:9200/%3C${CP_SECURITY_LOGS_ELASTIC_PREFIX:-security_log}-%7Bnow%2Fm%7Byyyy.MM.dd%7D%7D-0000001%3E -d "$INDEX_TEMPLATE"

EDGE_PIPELINE="{

    \"description\" : \"Log data extraction pipeline from EDGE\",
    \"processors\": [
      {
        \"grok\": {
          \"field\": \"message\",
          \"patterns\": [\"%{DATESTAMP:log_timestamp} %{GREEDYDATA} Application: %{GREEDYDATA:application}; User: %{DATA:user}; %{GREEDYDATA}\"]
        }
      },
       {
         \"rename\": {
           \"field\": \"fields.type\",
           \"target_field\": \"type\"
         }
       },
       {
         \"set\": {
           \"field\": \"service_account\",
           \"value\": false,
           \"ignore_failure\": true
          }
       },
        {
         \"script\": {
           \"ignore_failure\": false,
           \"lang\": \"painless\",
           \"source\": \"ctx.event_id=System.nanoTime()\"
         }
       },
       {
         \"set\": {
           \"if\": \"ctx.user.equalsIgnoreCase('$CP_DEFAULT_ADMIN_NAME')\",
           \"field\": \"service_account\",
           \"value\": true,
           \"ignore_failure\": true
          }
       },
       {
         \"rename\": {
           \"field\": \"fields.service\",
           \"target_field\": \"service_name\"
         }
       },
       {
         \"rename\": {
           \"field\": \"host.name\",
           \"target_field\": \"hostname\"
         }
       },
       {
         \"date\": {
            \"field\" : \"log_timestamp\",
            \"target_field\" : \"message_timestamp\",
            \"formats\" : [\"yy/MM/dd HH:mm:ss\"]
         }
       },
       {
         \"remove\": {
           \"field\": \"log_timestamp\",
           \"ignore_missing\": true,
           \"ignore_failure\": true
          }
       },
       {
         \"remove\": {
           \"field\": \"fields\",
           \"ignore_missing\": true,
           \"ignore_failure\": true
          }
       },
       {
         \"remove\": {
           \"field\": \"host\",
           \"ignore_missing\": true,
           \"ignore_failure\": true
          }
       }
    ]
}"

curl -H 'Content-Type: application/json' -XPUT localhost:9200/_ingest/pipeline/edge -d "$EDGE_PIPELINE"

API_SRV_PIPELINE="{

    \"description\" : \"Log data extraction pipeline from API server\",
    \"processors\": [
       {
         \"rename\": {
           \"field\": \"fields.type\",
           \"target_field\": \"type\"
         }
       },
       {
         \"set\": {
           \"field\": \"service_account\",
           \"value\": false,
           \"ignore_failure\": true
          }
       },
       {
         \"set\": {
           \"if\": \"ctx.user.equalsIgnoreCase('$CP_DEFAULT_ADMIN_NAME')\",
           \"field\": \"service_account\",
           \"value\": true
          }
       },
       {
         \"script\": {
           \"ignore_failure\": false,
           \"lang\": \"painless\",
           \"source\": \"ctx.event_id=System.nanoTime()\"
         }
       },
       {
         \"rename\": {
           \"field\": \"fields.service\",
           \"target_field\": \"service_name\"
         }
       },
       {
         \"rename\": {
           \"field\": \"host.name\",
           \"target_field\": \"hostname\"
         }
       },
       {
         \"date\": {
            \"field\" : \"timestamp\",
            \"target_field\" : \"message_timestamp\",
            \"formats\" : [\"yyyy-MM-dd'T'HH:mm:ss.SSSZ\"]
         }
       },
       {
         \"remove\": {
           \"field\": \"timestamp\",
           \"ignore_missing\": true,
           \"ignore_failure\": true
          }
       },
       {
         \"remove\": {
           \"field\": \"fields\",
           \"ignore_missing\": true,
           \"ignore_failure\": true
          }
       },
       {
         \"remove\": {
           \"field\": \"host\",
           \"ignore_missing\": true,
           \"ignore_failure\": true
          }
       }
    ]
}"

curl -H 'Content-Type: application/json' -XPUT localhost:9200/_ingest/pipeline/api_server -d "$API_SRV_PIPELINE"

LOG_BACKUP_REPO="{
  \"type\": \"s3\",
  \"settings\": {
    \"bucket\": \"${CP_PREF_STORAGE_SYSTEM_STORAGE_NAME}\",
    \"base_path\": \"log_backup_repo\"
  }
}"

curl -H 'Content-Type: application/json' -XPUT localhost:9200/_snapshot/log_backup_repo -d "$LOG_BACKUP_REPO"

envsubst < /root/.curator/curator-actions-template.yml > /root/.curator/curator-actions.yml
cat > /etc/cron.d/curator-cron <<EOL
0 0 * * * curator --config /root/.curator/curator.yml /root/.curator/curator-actions.yml
EOL

chmod 0644 /etc/cron.d/curator-cron

crontab /etc/cron.d/curator-cron

crond

wait

Kibana pod configs

cp-search-kibana-dpl.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: cp-log-store-kibana
  namespace: default
spec:
  replicas: 1
  template:
    metadata:
      namespace: default
      labels:
        cloud-pipeline/cp-log-store-kibana: "true"
    spec:
      nodeSelector:
        cloud-pipeline/cp-log-store-kibana: "true"
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
        - name: cp-log-store-kibana
          image: docker.elastic.co/kibana/kibana:6.8.3
          securityContext:
            privileged: true
          imagePullPolicy: "IfNotPresent"
          command: ["bash"]
          args: ["-c", "/usr/local/bin/kibana-docker"]
          env:
            - name: ELASTICSEARCH_HOSTS
              value: "http://${CP_SEARCH_ELK_INTERNAL_HOST}:${CP_SEARCH_ELK_ELASTIC_INTERNAL_PORT}"
          readinessProbe:
            httpGet:
              path: /
              port: 5601
              scheme: HTTP
            initialDelaySeconds: 5
            periodSeconds: 10

cp-search-kibana-svc.yaml

apiVersion: v1
kind: Service
metadata:
  labels:
    cloud-pipeline/cp-log-store-kibana: "true"
  name: cp-log-store-kibana
  namespace: default
spec:
  ports:
  - protocol: TCP
    port: ${CP_SEARCH_KIBANA_PORT}
    targetPort: 5601
    name: cp-log-store-kibana-port
  selector:
    cloud-pipeline/cp-log-store-kibana: "true"

Filebeat

install Filebeat:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.3-x86_64.rpm
rpm -vi filebeat-6.8.3-x86_64.rpm

filebeat.yml for api-srv

setup.template.name: "security_log"
setup.template.pattern: "security_log*"
output.elasticsearch:
  hosts: ["http://${CP_SEARCH_ELK_INTERNAL_HOST}:${CP_SEARCH_ELK_ELASTIC_INTERNAL_PORT}"]
  pipeline: "api_server"
  index: "security_log"
filebeat:
  inputs:
    - paths:
        - /opt/api/logs/security.json
      json.keys_under_root: true
      json.add_error_key: true
      fields:
        service: api-srv
        type: security
processors:
  - drop_fields:
     fields: ["instant", "beat", "log", "loggerFqcn", "threadId", "log_context", "offset", "threadPriority", "endOfBatch", "agent", "log.offset", "esc" ]

filebeat.yml for edge

setup.template.name: "security_log"
setup.template.pattern: "security_log*"
output.elasticsearch:
  hosts: ["http://${CP_SEARCH_ELK_INTERNAL_HOST}:${CP_SEARCH_ELK_ELASTIC_INTERNAL_PORT}"]
  pipeline: "edge"
  index: "security_log"
filebeat:
  inputs:
    - paths:
        - /etc/nginx/logs/error.log
      fields:
        service: edge
        type: security
      include_lines: ['SECURITY']

processors:
  - drop_fields:
      fields: ["log", "offset", "beat", "agent", "log.offset", "esc" ]

Expand install-config with new env vars

...
CP_SECURITY_LOGS_ELASTIC_PREFIX=security_log
# s3 bucket for elastic repositiry
CP_LOGS_ELASTIC_BACKUP_REPO=
CP_SECURITY_LOGS_ELASTIC_BACKUP_DAYS=365
CP_SECURITY_LOGS_ROLLOVER_DAYS=20
...