Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-ganbold committed Nov 17, 2019
2 parents 3cbf690 + 18c9bc5 commit 06ffb3f
Show file tree
Hide file tree
Showing 13 changed files with 808 additions and 22 deletions.
12 changes: 5 additions & 7 deletions README.md
@@ -1,9 +1,9 @@
# Cloud Container Attack Tool (CCAT :cloud::cat2:)
[![Rhino](https://img.shields.io/badge/rhino-offensive%20%7C%20tool-red?logo=)](https://rhinosecuritylabs.com) [![PyPI](https://img.shields.io/badge/python-3.5%20%7C%203.6%20%7C%203.7-blue.svg)](https://github.com/RhinoSecurityLabs/container-hack) [![GitHub license](https://img.shields.io/badge/license-BSD-brightgreen.svg)](https://github.com/RhinoSecurityLabs/container-hack/blob/master/LICENSE) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/RhinoSecurityLabs/ccat/pulls)
[![Rhino](https://img.shields.io/badge/rhino-offensive%20%7C%20tool-red?logo=)](https://rhinosecuritylabs.com) [![PyPI](https://img.shields.io/badge/python-3.5%20%7C%203.6%20%7C%203.7-blue.svg)](https://github.com/RhinoSecurityLabs/ccat) [![GitHub license](https://img.shields.io/badge/license-BSD-brightgreen.svg)](https://github.com/RhinoSecurityLabs/ccat/blob/master/LICENSE) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/RhinoSecurityLabs/ccat/pulls)

_Cloud Container Attack Tool (CCAT) is a tool for testing security of container environments._

<img src="https://raw.githubusercontent.com/RhinoSecurityLabs/ccat/master/docs/images/ccat_main_menu.png" width="1000"/>
<img src="https://raw.githubusercontent.com/RhinoSecurityLabs/ccat/master/docs/images/ccat_new_main_menu.png" width="1000"/>

## Quick reference
- **Where to get help**:
Expand All @@ -18,13 +18,11 @@ _Cloud Container Attack Tool (CCAT) is a tool for testing security of container
## Requirements
* Python 3.5+ is required.
* Docker is required. Note: CCAT is tested with Docker Engine 19.03.1 version.
* AWS named profile is required.
* Named profile is required for using AWS functionality.
* A service account or access token is required for using GCP functionality.

## Installation
<!-- Install CCAT with pip
```
$ pip install ccat
``` -->
> We recommend using the provided Docker image to run CCAT, so that you will not face any difficulty with the required dependencies on your own system.
Install CCAT from source

Expand Down
288 changes: 282 additions & 6 deletions ccat.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import json
import sys
import pathlib


import boto3
Expand All @@ -9,20 +10,36 @@
from PyInquirer import (prompt, Separator, style_from_dict, Token)
from tabulate import tabulate


# AWS
import modules.ecr__enum_repos.main as ecr__enum_repos
import modules.ecr__pull_repos.main as ecr__pull_repos
import modules.docker__backdoor.main as docker__backdoor
import modules.ecr__push_repos.main as ecr__push_repos

# GCP
import modules.gcr__enum_repos.main as gcr__enum_repos
import modules.gcr__pull_repos.main as gcr__pull_repos
import modules.gcr__push_repos.main as gcr__push_repos

# Docker
import modules.docker__backdoor.main as docker__backdoor

# AWS words
ENUMERATE_ECR = 'Enumerate ECR'
PULL_ECR_REPOS = 'Pull Repos from ECR'
PUSH_ECR_REPOS = 'Push Repos to ECR'
DOCKER_BACKDOOR = 'Docker Backdoor'
LIST_ECR_REPOS = 'List Enumerated ECR Repos'
SWAP_AWS_PROFILE = 'Swap AWS Profile'

# GCP words
ENUMERATE_GCR = 'Enumerate GCR'
PULL_GCR_REPOS = 'Pull Repos from GCR'
PUSH_GCR_REPOS = 'Push Repos to GCR'
LIST_GCR_REPOS = 'List Enumerated GCR Repos'
SWAP_GCP_CREDS = 'Swap GCP Credentials'

# Docker words
DOCKER_BACKDOOR = 'Docker Backdoor'


custom_style = style_from_dict({
Token.Separator: '#6C6C6C',
Expand All @@ -35,13 +52,16 @@
Token.Question: '',
})


class CLI(object):
def __init__(self):
aws = AWS()
gcp = GCP()
docker = Docker()

self.extentions = {
'aws': aws,
'gcp': gcp,
'docker': docker
}

Expand Down Expand Up @@ -83,6 +103,7 @@ def print_module_summary(self, data, module):
print('MODULE SUMMARY:\n\n{}\n'.format(summary.strip('\n')))

def run_module(self, answers):
# AWS
if ENUMERATE_ECR in answers['main_menu']:
cli_answers = self.extentions['aws'].ask_ecr_enum_repos()
self.print_module_running(ecr__enum_repos.module_info['name'])
Expand All @@ -105,15 +126,42 @@ def run_module(self, answers):
data = ecr__push_repos.main(cli_answers)
self.print_module_summary(data, ecr__push_repos)

elif SWAP_AWS_PROFILE in answers['main_menu']:
self.extentions['aws'].swap_profile()

# GCP
elif ENUMERATE_GCR in answers['main_menu']:
cli_answers = self.extentions['gcp'].ask_gcr_enum_repos()
self.print_module_running(gcr__enum_repos.module_info['name'])
data = gcr__enum_repos.main(cli_answers)
self.extentions['gcp'].data.update({'gcr_repos': data})
self.print_module_summary(data, gcr__enum_repos)

elif LIST_GCR_REPOS in answers['main_menu']:
self.extentions['gcp'].print_gcr_repos()

elif PULL_GCR_REPOS in answers['main_menu']:
cli_answers = self.extentions['gcp'].ask_gcr_pull_repos()
self.print_module_running(gcr__pull_repos.module_info['name'])
data = gcr__pull_repos.main(cli_answers)
self.print_module_summary(data, gcr__pull_repos)

elif PUSH_GCR_REPOS in answers['main_menu']:
cli_answers = self.extentions['gcp'].ask_gcr_push_repos()
self.print_module_running(gcr__push_repos.module_info['name'])
data = gcr__push_repos.main(cli_answers)
self.print_module_summary(data, gcr__push_repos)

elif SWAP_GCP_CREDS in answers['main_menu']:
self.extentions['gcp'].swap_service_account()

# Docker
elif DOCKER_BACKDOOR in answers['main_menu']:
cli_answers = self.extentions['docker'].ask_docker_backdoor()
self.print_module_running(docker__backdoor.module_info['name'])
data = docker__backdoor.main(cli_answers)
self.print_module_summary(data, docker__backdoor)

elif SWAP_AWS_PROFILE in answers['main_menu']:
self.extentions['aws'].swap_profile()

else:
self.exit_cli()

Expand Down Expand Up @@ -354,6 +402,234 @@ def append_configuration(self, answers):
})


class GCP(object):
def __init__(self, service_account_json_file_path=None, access_token=None):
self.configuration = {
'service_account_json_file_path': service_account_json_file_path,
'access_token': access_token,
}

self.data = {}

def get_menu_choices_registries(self, gcp_registries=[]):
choices = []
for registry in gcp_registries:
choices.append({
'name': registry
})

return choices

def get_menu(self):
return [
Separator('= GCP ='),
ENUMERATE_GCR,
LIST_GCR_REPOS,
PULL_GCR_REPOS,
PUSH_GCR_REPOS,
SWAP_GCP_CREDS
]

def swap_service_account(self):
if self.configuration.get('service_account_json_file_path'):
print('Current Service Account: {}'.format(self.configuration.get('service_account_json_file_path')))
self.set_configuration()

def ask_configuration(self):
if not self.configuration.get('service_account_json_file_path') and not self.configuration.get('access_token'):
print('Did not find GCP configuration!')

questions = [
{
'type': 'list',
'name': 'creds_choice',
'message': 'What GCP credential would you like to setup?',
'choices': ['Service Account', 'Access Token']
}
]

answers = prompt(questions)

if answers['creds_choice'] == 'Service Account':
# service account
questions = [
{
'type': 'input',
'name': 'service_account_json_file_path',
'message': 'Enter GCP Service Account json file path',
'validate': lambda profile: len(profile) != 0 or 'GCP Service Account can not be empty!'
}
]

while True:
answers = prompt(questions)
path = pathlib.Path(answers.get('service_account_json_file_path'))
if path.exists():
break
else:
print("Service account json file does not exist")
else:
# access token
questions = [
{
'type': 'input',
'name': 'access_token',
'message': 'Enter GCP Access Token',
'validate': lambda profile: len(profile) != 0 or 'GCP Access Token can not be empty!'
}
]

answers = prompt(questions)

return answers


def is_configured(self):
return self.configuration.get('service_account_json_file_path') is not None or self.configuration.get('access_token') is not None

def set_configuration(self):
answers = self.ask_configuration()

print('Configuring GCP...')

self.configuration.update({
'service_account_json_file_path': answers.get('service_account_json_file_path'),
'access_token': answers.get('access_token')
})

print('Successfully configured GCP\n')

def ask_gcr_enum_repos(self):
if self.is_configured() is False:
self.set_configuration()

gcp_registries = ['gcr.io','us.gcr.io','eu.gcr.io','asia.gcr.io']

questions = [
{
'type': 'checkbox',
'name': 'gcp_registries',
'message': 'Select GCP registeries to enumerate',
'choices': self.get_menu_choices_registries(gcp_registries)
}
]

answers = prompt(questions)
self.append_configuration(answers)

return answers

def ask_gcr_pull_repos(self):
if self.is_configured() is False:
self.set_configuration()

# gcr_pull_all
questions = [
{
'type': 'list',
'name': 'gcr_pull_options',
'message': 'GCR Pull Options',
'choices': [
'Pull all enumerated repos',
'Pull single repo with multiple tags'
]
}
]

answers = prompt(questions)

if 'Pull all enumerated repos' == answers.get('gcr_pull_options'):
if self.data.get('gcr_repos') and self.data.get('gcr_repos').get('payload'):
gcr_repos = []
repositories_by_registry = self.data.get('gcr_repos').get('payload').get('repositories_by_registry')
for repos in repositories_by_registry.values():
for repo in repos:
gcr_repos.append(repo.get('repositoryUri'))

answers.update({
'repositories': gcr_repos
})
else:
questions = [
{
'type': 'input',
'name': 'repositories',
'message': 'Enter GCP GCR repository URI'
},
{
'type': 'input',
'name': 'repository_tags',
'message': 'Enter GCP GCR repository tags seperated by comma'
}
]

answers = prompt(questions)

# strip(',') remove leading or trailing (,)
# replace(' ', '') remove spaces
# split by comma to generate a list of tags
answers['repository_tags'] = answers['repository_tags'].strip(',').replace(' ', '').split(',')
answers['repositories'] = [answers['repositories']]

self.append_configuration(answers)

return answers

def ask_gcr_push_repos(self):
if self.is_configured() is False:
self.set_configuration()

questions = [
{
'type': 'input',
'name': 'repository_uri',
'message': 'Enter GCP GCR repository URI'
},
{
'type': 'input',
'name': 'repository_tag',
'message': 'Enter GCP GCR repository tag'
}
]

answers = prompt(questions)
self.append_configuration(answers)

return answers

def append_configuration(self, answers):
answers.update({
'service_account_json_file_path': self.configuration['service_account_json_file_path'],
'access_token': self.configuration['access_token']
})


def print_gcr_repos(self):
headers = ['Repo Name', 'Repo Uri', 'Latest Tag', 'Number of Tags', 'Registry']
rows = []

if self.data.get('gcr_repos') and self.data.get('gcr_repos').get('count') > 0:
for registry in self.data['gcr_repos']['payload']['gcp_registries']:
repos = self.data['gcr_repos']['payload']['repositories_by_registry'][registry]
for repo in repos:
row = []
row.append(repo['repositoryName'])
row.append(repo['repositoryUri'])

tag_latest = ''
if repo.get('tags'):
tag_latest = repo.get('tags')[0]
row.append(tag_latest)
row.append(len(repo.get('tags')))
else:
row.append(0)

row.append(registry)
rows.append(row)

print(tabulate(rows, headers=headers, tablefmt='orgtbl'), '\n')


class Docker(object):
def get_menu(self):
return [
Expand Down
Binary file removed docs/images/ccat_main_menu.png
Binary file not shown.
Binary file added docs/images/ccat_new_main_menu.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 06ffb3f

Please sign in to comment.