Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve strategy coverage #380

Merged
merged 5 commits into from Dec 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 30 additions & 0 deletions backend/breach/migrations/0027_auto_20171210_1343.py
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.2 on 2017-12-10 13:43
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('breach', '0026_auto_20171208_1153'),
]

operations = [
migrations.AddField(
model_name='round',
name='block_align',
field=models.BooleanField(default=True, help_text='Whether to use block alignment or not, in case maxreflectionlength does not allow it'),
),
migrations.AddField(
model_name='round',
name='huffman_pool',
field=models.BooleanField(default=True, help_text='Whether to use Huffman pool or not, in case maxreflectionlength does not allow it'),
),
migrations.AddField(
model_name='round',
name='method',
field=models.IntegerField(choices=[(1, 'serial'), (2, 'divide&conquer'), (3, 'backtracking')], default=1, help_text='Method of building candidate samplesets.'),
),
]
39 changes: 20 additions & 19 deletions backend/breach/models/round.py
Expand Up @@ -2,33 +2,16 @@
from django.db import models
from django.core.exceptions import ValidationError
from breach.analyzer import decide_next_world_state
from breach.models import Target
from itertools import groupby


class Round(models.Model):
class Meta:
unique_together = (('victim', 'index'),)

def check_block_align(self):
try:
return self.block_align
except AttributeError:
self.block_align = self.victim.target.block_align
return self.block_align

def check_huffman_pool(self):
try:
return self.huffman_pool
except AttributeError:
self.huffman_pool = self.victim.target.huffman_pool
return self.huffman_pool

def get_method(self):
try:
return self.method
except AttributeError:
self.method = self.victim.target.method
return self.method
return self.method

def clean(self):
if not self.knownsecret.startswith(self.victim.target.prefix):
Expand Down Expand Up @@ -129,3 +112,21 @@ def fetch_per_batch_info(self):
default=1.0,
help_text='Accumulated probability of current round\'s given knownsecret. '
)

method = models.IntegerField(
default=Target.SERIAL,
choices=Target.METHOD_CHOICES,
help_text='Method of building candidate samplesets.'
)

block_align = models.BooleanField(
default=True,
help_text=('Whether to use block alignment or not, in case '
'maxreflectionlength does not allow it')
)

huffman_pool = models.BooleanField(
default=True,
help_text=('Whether to use Huffman pool or not, in case '
'maxreflectionlength does not allow it')
)
21 changes: 10 additions & 11 deletions backend/breach/strategy.py
Expand Up @@ -145,7 +145,7 @@ def _reflection(self, alphabet):
''
]

if self._round.check_huffman_pool():
if self._round.huffman_pool:
# Huffman complement indicates the knownalphabet symbols that are not currently being tested
huffman_complement = set(self._round.knownalphabet) - set(alphabet)

Expand Down Expand Up @@ -349,18 +349,14 @@ def _get_first_reflection():
return

while len(_get_first_reflection()) > self._round.victim.target.maxreflectionlength:
if self._round.get_method() == Target.DIVIDE_CONQUER:
self._round.victim.target.method = Target.SERIAL
self._round.victim.target.save()
if self._round.method == Target.DIVIDE_CONQUER:
self._round.method = Target.SERIAL
self._round.save()
logger.info('Divide & conquer method cannot be used, falling back to serial.')
elif self._round.check_huffman_pool():
elif self._round.huffman_pool:
self._round.huffman_pool = False
self._round.save()
logger.info('Huffman pool cannot be used, removing it.')
elif self._round.check_block_align():
self._round.block_align = False
self._round.save()
logger.info('Block alignment cannot be used, removing it.')
else:
raise MaxReflectionLengthError('Cannot attack, specified maxreflectionlength is too short')

Expand Down Expand Up @@ -391,7 +387,10 @@ def _create_round(self, state):
amount=self._victim.target.samplesize,
knownalphabet=state['knownalphabet'],
knownsecret=state['knownsecret'],
accumulated_probability=prob
accumulated_probability=prob,
huffman_pool=self._victim.target.huffman_pool,
block_align=self._victim.target.block_align,
method=self._victim.target.method
)
next_round.save()
self._round = next_round
Expand Down Expand Up @@ -423,7 +422,7 @@ def _create_round_samplesets(self):
candidate_alphabets = self._build_candidates(state)

alignmentalphabet = ''
if self._round.check_block_align():
if self._round.block_align:
alignmentalphabet = list(self._round.victim.target.alignmentalphabet)
random.shuffle(alignmentalphabet)
alignmentalphabet = ''.join(alignmentalphabet)
Expand Down
1 change: 1 addition & 0 deletions backend/breach/tests/test_backtracking.py
Expand Up @@ -39,6 +39,7 @@ def test_create_multiple_branches(self, Sniffer):
amount=1,
knownsecret='branchsecret',
knownalphabet='012',
method=BACKTRACKING
)

SampleSet.objects.create(
Expand Down
25 changes: 1 addition & 24 deletions backend/breach/tests/test_models.py
@@ -1,5 +1,5 @@
from django.test import TestCase
from breach.models import Target, Victim, Round
from breach.models import Target, Victim


class ModelTestCase(TestCase):
Expand Down Expand Up @@ -35,26 +35,3 @@ def test_victim(self):
self.assertEqual(self.victim.realtimeurl, 'http://localhost:3031')
self.assertEqual(self.victim.snifferendpoint, 'http://127.0.0.1:9000')
self.assertEqual(self.victim.calibration_wait, 0.0)

def test_round(self):
round1 = Round.objects.create(
victim=self.victim,
knownalphabet='01'
)

self.assertTrue(round1.check_block_align())
round1.block_align = False
self.assertTrue(round1.check_huffman_pool())
round1.huffman_pool = False
self.assertEqual(round1.get_method(), 1)
round1.method = 2

round2 = Round.objects.create(
victim=self.victim,
knownalphabet='01',
index=2
)

self.assertTrue(round2.check_block_align())
self.assertTrue(round2.check_huffman_pool())
self.assertEqual(round2.get_method(), 1)
69 changes: 69 additions & 0 deletions backend/breach/tests/test_strategy.py
Expand Up @@ -212,3 +212,72 @@ def test_divide_and_conquer(self, Sniffer):
'https://di.uoa.gr/?breach=^1^0^test3^test2^'
)
strategy1._mark_current_work_completed()

@patch('breach.strategy.Sniffer')
def test_downgrade_to_serial(self, Sniffer):
target = Target.objects.create(
name='maxreflection',
endpoint='https://test.com/?breach=%s',
prefix='test',
alphabet='0123',
maxreflectionlength=16,
method=2
)

victim = Victim.objects.create(
target=target,
sourceip='192.168.10.141',
snifferendpoint='http://localhost/'
)

strategy = Strategy(victim)
work = strategy.get_work()
self.assertEqual(work, {'url': u'https://test.com/?breach=^1^3^2^test0^', 'amount': 64, 'timeout': 0, 'alignmentalphabet': u''})

target.delete()

@patch('breach.strategy.Sniffer')
def test_downgrade_huffman(self, Sniffer):
target = Target.objects.create(
name='maxreflection',
endpoint='https://test.com/?breach=%s',
prefix='test',
alphabet='0123',
maxreflectionlength=12,
method=2
)

victim = Victim.objects.create(
target=target,
sourceip='192.168.10.141',
snifferendpoint='http://localhost/'
)

strategy = Strategy(victim)
work = strategy.get_work()
self.assertEqual(work, {'url': u'https://test.com/?breach=^test0^', 'amount': 64, 'timeout': 0, 'alignmentalphabet': u''})

target.delete()

@patch('breach.strategy.Sniffer')
def test_maxreflectionerror(self, Sniffer):
target = Target.objects.create(
name='maxreflection',
endpoint='https://test.com/?breach=%s',
prefix='test',
alphabet='0123',
maxreflectionlength=6,
method=2
)

victim = Victim.objects.create(
target=target,
sourceip='192.168.10.141',
snifferendpoint='http://localhost/'
)

strategy = Strategy(victim)
work = strategy.get_work()
self.assertEqual(work, {})

target.delete()