diff --git a/examples/reschedule_analysis.py b/examples/reschedule_analysis.py index 66924e2..2f60bf4 100755 --- a/examples/reschedule_analysis.py +++ b/examples/reschedule_analysis.py @@ -2,8 +2,8 @@ from time import sleep -from app import app -from app.controllers.api import APIControl +from poli import app +from poli.controllers.api import APIControl def launch(): diff --git a/examples/schedule_samples_analysis.py b/examples/schedule_samples_analysis.py index 3a06d48..2144c51 100755 --- a/examples/schedule_samples_analysis.py +++ b/examples/schedule_samples_analysis.py @@ -2,9 +2,9 @@ import sys from time import sleep -from app.controllers.api import APIControl +from poli.controllers.api import APIControl api = APIControl() for i in sys.argv[1:]: - api.analysiscontrol.schedule_sample_analysis(int(i)) + api.analysiscontrol.schedule_sample_analysis(int(i), force=True) sleep(200) diff --git a/poli/controllers/analysis.py b/poli/controllers/analysis.py index 193ab2c..a098f97 100644 --- a/poli/controllers/analysis.py +++ b/poli/controllers/analysis.py @@ -144,8 +144,6 @@ def reschedule_all_analysis(self, force=False): for sample in Sample.query.all(): if force or sample.analysis_status == AnalysisStatus.TOSTART: self.schedule_sample_analysis(sample.id, force) - elif sample.analysis_status == AnalysisStatus.RUNNING: - self.schedule_sample_analysis(sample.id, force) class Analysis(object): diff --git a/poli/controllers/idaactions.py b/poli/controllers/idaactions.py index a3b996e..fc8a76c 100644 --- a/poli/controllers/idaactions.py +++ b/poli/controllers/idaactions.py @@ -15,6 +15,7 @@ from poli.models.idaactions import IDANameAction, IDACommentAction from poli.models.idaactions import IDAStruct, IDAStructSchema from poli.models.idaactions import IDAStructMember +from poli.models.idaactions import IDATypeAction from poli.models.sample import Sample @@ -207,3 +208,25 @@ def change_struct_member_size(struct_id, mid, new_size): db.session.commit() return True + + @staticmethod + def add_typedef(address, typedef): + """ + Creates a new type definition + """ + mtype = IDATypeAction() + mtype.address = address + mtype.data = typedef + mtype.timestamp = datetime.datetime.now() + db.session.add(mtype) + db.session.commit() + return mtype.id + + @classmethod + def get_typedefs(cls, sid, addr=None, timestamp=None): + """ + Return filtered IDA Pro type definitions + """ + data = cls.filter_actions('idatypes', sid, addr, timestamp) + schema = IDAActionSchema(many=True) + return schema.dump(data).data diff --git a/poli/controllers/sample.py b/poli/controllers/sample.py index 4278af2..127869c 100644 --- a/poli/controllers/sample.py +++ b/poli/controllers/sample.py @@ -318,8 +318,9 @@ def search_hash(cls, needle): c = Sample.query.filter(Sample.md5.like(needleq)).all() results = list(set(a + b + c)) function_results = None - if re.match("[0-9a-f]{8}", needle): - function_results = cls.search_machoc_single_hash(needle) + # XXX fix this + #if re.match("[0-9a-f]{8}", needle): + #function_results = cls.search_machoc_single_hash(needle) return results, function_results @classmethod @@ -432,23 +433,28 @@ def match_by_importhash(cls, sample): cls.add_sample_match(sample, sample_2, "iat_hash") # add the corresponding match to the other sample cls.add_sample_match(sample_2, sample, "iat_hash") - continue return True + @staticmethod + def query_machoc_matches(sample, sample_2): + query = SampleMatch.query.filter(SampleMatch.sid_1.in_([sample.id, sample_2.id]), SampleMatch.sid_2.in_( + [sample.id, sample_2.id]), SampleMatch.match_type == "machoc80") + if query.count() != 0: + return True + return False + @classmethod def match_by_machoc80(cls, sample): """ Match samples by machoc hash. """ - if len(sample.functions) == 0: + if sample.functions.count() == 0: return True for sample_2 in Sample.query.all(): - if len(sample_2.functions) == 0: - continue - if SampleMatch.query.filter(SampleMatch.sid_1.in_([sample.id, sample_2.id]), SampleMatch.sid_2.in_( - [sample.id, sample_2.id]), SampleMatch.match_type == "machoc80").count() != 0: + if cls.query_machoc_matches(sample, sample_2): continue - if cls.machoc_diff_samples(sample, sample_2) >= 0.8: + elif cls.machoc_diff_samples(sample, sample_2) >= 0.8: + app.logger.debug("Add machoc match %d %d", sample.id, sample_2.id) cls.add_sample_match(sample, sample_2, "machoc80") cls.add_sample_match(sample_2, sample, "machoc80") return True @@ -458,11 +464,11 @@ def machoc_diff_with_all_samples(cls, sample, level=0.8): """ Diff a sample with all other samples. Class method. """ - if len(sample.functions) == 0: + if sample.functions.count() == 0: return [] hits = [] for sample_2 in Sample.query.all(): - if len(sample_2.functions) == 0 or sample_2 == sample: + if sample_2.functions.count() == 0 or sample_2.id == sample.id: continue hit_rate = cls.machoc_diff_samples(sample, sample_2) if hit_rate >= level: @@ -474,8 +480,6 @@ def machoc_diff_samples(cls, sample1, sample2): """ Diff two samples using machoc. """ - if sample1 == sample2: - return 0 sample1_hashes = [] sample2_hashes = [] for f in sample1.functions: @@ -484,7 +488,8 @@ def machoc_diff_samples(cls, sample1, sample2): for f in sample2.functions: if f.machoc_hash is not None and f.machoc_hash != -1: sample2_hashes.append(f.machoc_hash) - return cls.machoc_diff_hashes(sample1_hashes, sample2_hashes) + rate = cls.machoc_diff_hashes(sample1_hashes, sample2_hashes) + return rate @staticmethod def machoc_diff_hashes(sample1_hashes, sample2_hashes): @@ -496,7 +501,8 @@ def machoc_diff_hashes(sample1_hashes, sample2_hashes): maxlen = max(len(sample1_hashes), len(sample2_hashes)) c1, c2 = map(Counter, (sample1_hashes, sample2_hashes)) ch = set(sample1_hashes).intersection(set(sample2_hashes)) - return float(sum(map(lambda h: max(c1[h], c2[h]), ch))) / maxlen + rate = float(sum(map(lambda h: max(c1[h], c2[h]), ch))) / maxlen + return rate def machoc_get_similar_functions(self, sample_dst, sample_src): """ @@ -686,7 +692,15 @@ def add_multiple_strings(self, sample, strings): return True @staticmethod - def add_function(sample, address, machoc_hash, + def query_function_info(sample, address): + obj = FunctionInfo.query.filter_by(sample_id=sample.id, address=address) + if obj.count() != 0: + return obj.first() + else: + return None + + @classmethod + def add_function(cls, sample, address, machoc_hash, name="", overwrite=False, do_commit=True): """ Add a function. Updates if exists. @@ -696,9 +710,8 @@ def add_function(sample, address, machoc_hash, if name == "": name = "sub_" + hex(address)[2:] functions_exists = False - obj = FunctionInfo.query.filter_by(sample=sample, address=address) - if obj.count() != 0: - function_info = obj.first() + function_info = cls.query_function_info(sample, address) + if function_info is not None: functions_exists = True if not overwrite: return True diff --git a/poli/controllers/tasks/task_analyzeitrb.py b/poli/controllers/tasks/task_analyzeitrb.py index e8e332c..e4d5812 100644 --- a/poli/controllers/tasks/task_analyzeitrb.py +++ b/poli/controllers/tasks/task_analyzeitrb.py @@ -52,7 +52,7 @@ def get_addr_data(line): addr, data = None, None if len(items) == 3: addr = int(items[1], 16) - data = items[2][:-1] + data = items[2].replace('\n','') return addr, data def parse_machoc_signatures(self): diff --git a/poli/models/family.py b/poli/models/family.py index 07c4895..aa99013 100644 --- a/poli/models/family.py +++ b/poli/models/family.py @@ -88,16 +88,16 @@ def fromstring(cls, val): # Yara signatures relationship (auto-classification). familytoyara = db.Table('familytoyara', db.Column('yara_id', db.Integer, - db.ForeignKey('yararule.id')), + db.ForeignKey('yararule.id'), index=True), db.Column('family_id', db.Integer, - db.ForeignKey('family.id')) + db.ForeignKey('family.id'), index=True) ) # Samples relationship. familytosample = db.Table('familytosample', db.Column('sample_id', db.Integer, - db.ForeignKey('sample.id')), + db.ForeignKey('sample.id'), index=True), db.Column('family_id', db.Integer, - db.ForeignKey('family.id')) + db.ForeignKey('family.id'), index=True) ) @@ -117,7 +117,7 @@ class Family(db.Model): secondary=familytoyara, backref=db.backref('families', lazy='dynamic')) # 1-N relationships - parent_id = db.Column(db.Integer, db.ForeignKey('family.id')) + parent_id = db.Column(db.Integer, db.ForeignKey('family.id'), index=True) subfamilies = db.relationship( 'Family', backref=db.backref( 'parents', remote_side=[id])) diff --git a/poli/models/idaactions.py b/poli/models/idaactions.py index b37d366..2009ae7 100644 --- a/poli/models/idaactions.py +++ b/poli/models/idaactions.py @@ -55,7 +55,9 @@ class IDACommentAction(IDAAction): class IDANameAction(IDAAction): - # Represent names in IDA + """ + This represents global names in IDA. + """ __tablename__ = 'idanames' id = db.Column(db.Integer(), db.ForeignKey('idaactions.id'), @@ -64,6 +66,19 @@ class IDANameAction(IDAAction): 'polymorphic_identity': 'idanames'} +class IDATypeAction(IDAAction): + """ + This represents the types as applied by + the shortcut 'Y' in IDA Pro + """ + __tablename__ = 'idatypes' + id = db.Column(db.Integer(), + db.ForeignKey('idaactions.id'), + primary_key=True) + __mapper_args__ = { + 'polymorphic_identity': 'idatypes'} + + class IDAApplyStructs(IDAAction): # This is the action of applying a structure to an address __tablename__ = 'idaapplystructs' @@ -116,6 +131,7 @@ class Meta: "type", ) + class IDAStructMemberSchema(ma.ModelSchema): class Meta: fields = ( diff --git a/poli/models/sample.py b/poli/models/sample.py index 151a1b8..0c57975 100644 --- a/poli/models/sample.py +++ b/poli/models/sample.py @@ -277,7 +277,7 @@ class Sample(db.Model): 'sample', remote_side=[id])) functions = db.relationship( "FunctionInfo", backref=db.backref( - 'sample', remote_side=[id])) + 'sample', remote_side=[id]), lazy="dynamic") filenames = db.relationship( "FileName", backref=db.backref( 'sample', remote_side=[id])) diff --git a/poli/views/apiview.py b/poli/views/apiview.py index eded70a..008500a 100644 --- a/poli/views/apiview.py +++ b/poli/views/apiview.py @@ -292,7 +292,8 @@ def api_get_sample_id_from_hash(shash): abort(400, "Invalid hash length") if sample is not None: return jsonify({'sample_id': sample.id}) - return jsonify({'sample_id': None}) + else: + abort(404) @apiview.route('/samples//download/') @@ -534,6 +535,29 @@ def api_post_sample_names(sid): return jsonify({'result': result}) +@apiview.route('/samples//types/', methods=['POST']) +def api_post_sample_types(sid): + """ + Manage the creation of type definitions at specific addresses + """ + data = request.json + addr = data['address'] + typedef = data['typedef'] + + action_id = api.idacontrol.add_typedef(addr, typedef) + result = api.samplecontrol.add_idaaction(sid, action_id) + return jsonify(dict(result=result)) + +@apiview.route('/samples//types/', methods=['GET']) +def api_get_sample_types(sid): + """ + Get the IDA types stored in DB + """ + current_timestamp, addr = get_filter_arguments(request) + data = api.idacontrol.get_typedefs(sid, addr, current_timestamp) + return jsonify({'typedefs': data}) + + @apiview.route('/samples//structs/', methods=['POST']) def api_create_struct(sid): """ diff --git a/poli/views/webui.py b/poli/views/webui.py index de4e3f5..b830cc3 100644 --- a/poli/views/webui.py +++ b/poli/views/webui.py @@ -65,7 +65,7 @@ def before_request(): Affects global variables for each request """ g.user = current_user - g.samples = Sample.query.all() + g.samples = Sample.query.order_by(Sample.id).limit(15).all() """ diff --git a/poliapi/mainapi.py b/poliapi/mainapi.py index 2435bd9..3fd9151 100644 --- a/poliapi/mainapi.py +++ b/poliapi/mainapi.py @@ -159,6 +159,19 @@ def set_abstract(self, sid, abstract): json_data = dict(abstract=abstract) return self.post(endpoint, json=json_data)["result"] + def get_abstract(self, sid): + endpoint = self.prepare_endpoint(root='samples') + endpoint += str(sid) + '/abstract/' + + return self.get(endpoint)["abstract"] + + + def get_sid_from_MD5(self, md5): + endpoint = self.prepare_endpoint(root='samples') + endpoint += md5 + '/' + + return self.get(endpoint) + class FamilyModule(MainModule): """ diff --git a/skelenox.py b/skelenox.py index 4de1202..65ab8b0 100644 --- a/skelenox.py +++ b/skelenox.py @@ -165,12 +165,6 @@ def __do_init(self): self.is_online = True self.init_sample_id() - def get_offline(self): - """ - Wrapper to close connection - """ - self.close_connection() - def close_connection(self): """ Cleanup the connection @@ -859,6 +853,9 @@ def update(): """ if not self.update_event.isSet(): self.update_event.set() + if self.kill_event.isSet(): + # Unregister the timer if we are killed + return -1 return self.delay def ts_setup_timer(): @@ -878,15 +875,19 @@ def kill(self): """ g_logger.debug("%s exiting", self.__class__.__name__) self.kill_event.set() + # we don't want to wait until the timeout on the update thread, + # so unlock the update event too + self.update_event.set() def run(self): self.setup_timer() while True: try: - if self.kill_event.wait(timeout=0.01): - break self.update_event.wait() self.update_event.clear() + if self.kill_event.wait(timeout=0.01): + return 0 + # if we are up, sync names self.sync_names() except Exception as mye: g_logger.exception(mye) @@ -981,6 +982,8 @@ def end_skelenox(self): self.skel_hooks.cleanup_hooks() self.skel_sync_agent.kill() + self.skel_sync_agent.skel_conn.close_connection() + self.skel_sync_agent.join() g_logger.info("Skelenox terminated") def launch_skelenox(): @@ -1026,4 +1029,4 @@ def term(self): if __name__ == '__main__': # RUN ! - launch_skelenox() + skel = launch_skelenox() diff --git a/tests/tests_api.py b/tests/tests_api.py index 79ff485..702b681 100755 --- a/tests/tests_api.py +++ b/tests/tests_api.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +""" + This module implements all the tests for the API view. +""" import os import unittest import tempfile @@ -25,7 +28,7 @@ def setUp(self): with poli.app.app_context(): api = APIControl() api.usercontrol.create("john", "password") - self.create_sample() + self._create_sample() poli.db.session.commit() def tearDown(self): @@ -34,17 +37,17 @@ def tearDown(self): os.close(self.db_fd) os.unlink(self.fname) - def login(self, username, password): + def _login(self, username, password): return self.app.post("/login/", data=dict( username=username, password=password), follow_redirects=True) - def create_sample(self): + def _create_sample(self): with open("tests/example_pe.bin", "rb") as hfile: data = StringIO(hfile.read()) - self.login("john", "password") + self._login("john", "password") retval = self.app.post("/samples/", content_type='multipart/form-data', data=dict({'file': (data, "toto")}, @@ -53,101 +56,20 @@ def create_sample(self): sleep(1) return retval - def push_comment(self, sid=1, address=None, comment=None): - retval = self.app.post('/api/1.0/samples/'+str(sid)+'/comments/', - data=json.dumps(dict(address=address, comment=comment)), - content_type="application/json") - return retval - - - def create_struct(self, sid=1, name=None): - retval = self.app.post('/api/1.0/samples/'+str(sid)+'/structs/', - data=json.dumps(dict(name=name)), - content_type="application/json") - return retval - - def create_struct_member(self, sid=1, struct_id=None, mname=None, size=0, offset=0): - url = '/api/1.0/samples/' + str(sid) - url += '/structs/' + str(struct_id) - url += '/members/' - retval = self.app.post(url, - data=json.dumps(dict(name=mname, - size=size, - offset=offset)), - content_type="application/json") - return retval - - def update_struct_member_name(self, sid=1, struct_id=None, mid=None, newname=""): - url = '/api/1.0/samples/' + str(sid) - url += '/structs/' + str(struct_id) - url += '/members/' - retval = self.app.patch(url, - data=json.dumps(dict(mid=mid, newname=newname)), - content_type="application/json") - return retval - - def update_struct_member_size(self, sid=1, struct_id=None, mid=None, newsize=0): - url = '/api/1.0/samples/' + str(sid) - url += '/structs/' + str(struct_id) - url += '/members/' - retval = self.app.patch(url, - data=json.dumps(dict(mid=mid, newsize=newsize)), - content_type="application/json") - return retval - - - - def get_all_structs(self, sid=1, name=None): - retval = self.app.get('/api/1.0/samples/' + str(sid) + - '/structs/') - return retval - - def get_one_struct(self, sid=1, struct_id=1): - url = '/api/1.0/samples/' + str(sid) - url += '/structs/' - url += str(struct_id) + '/' - retval = self.app.get(url) - return retval - - - def get_comment(self, sid=1, address=None): - retval = self.app.get('/api/1.0/samples/' + str(sid) + - '/comments/', - data=json.dumps({'address':address}), - content_type="application/json") - return retval - - def push_name(self, sid=1, address=None, name=None): - retval = self.app.post('/api/1.0/samples/'+str(sid)+'/names/', - data=json.dumps(dict(address=address, name=name)), - content_type="application/json") - return retval - - def get_name(self, sid=1, address=None): - retval = self.app.get('/api/1.0/samples/' + str(sid) + - '/names/') - return retval - - def create_family(self, name, tlp_level=None, parent=None): + def _create_family(self, name, tlp_level=None, parent=None): data = dict(name=name, tlp_level=tlp_level) if parent is not None: data["parent"] = parent retval = self.app.post('/api/1.0/family/', - data=json.dumps(data), - content_type="application/json") + data=json.dumps(data), + content_type="application/json") return retval - def create_yara(self, name, rule, tlp_level=None): - retval = self.app.post('/api/1.0/yaras/', - data=json.dumps(dict(name=name, rule=rule, tlp_level=tlp_level)), - content_type="application/json") - return retval - def update_yara(self, name, rule, tlp_level=None): - retval = self.app.patch('/api/1.0/yaras/', - data=json.dumps(dict(name=name, rule=rule, tlp_level=tlp_level)), - content_type="application/json") - return retval +class ApiSampleTests(ApiTestCase): + """ + Tests cases relatives to sample analysis and metadata + """ def test_get_sample_info(self): """ @@ -156,11 +78,14 @@ def test_get_sample_info(self): retval = self.app.get('/api/1.0/samples/1/') self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) - self.assertEqual(len(data),1) + self.assertEqual(len(data), 1) data = data['samples'] self.assertEqual(data['id'], 1) def test_get_sample_id(self): + """ + Test access to the sample by using MD5, SHA1 and SHA256 + """ # test getting ID by MD5 retval = self.app.get('/api/1.0/samples/0f6f0c6b818f072a7a6f02441d00ac69/') self.assertEqual(retval.status_code, 200) @@ -179,15 +104,20 @@ def test_get_sample_id(self): data = json.loads(retval.data) self.assertEqual(data['sample_id'], 1) - # Bug when using incorrect value for hash + def test_wrong_sample_hash(self): + """ + This triggered a bug when using incorrect value for hash + """ url = "api/1.0/samples/abcdef/" retval = self.app.get(url) self.assertEqual(retval.status_code, 400) data = json.loads(retval.data) self.assertEqual(data['error'], 400) - def test_get_multiples_sample_info(self): + """ + Extract some expected informations from the API + """ retval = self.app.get('/api/1.0/samples/') self.assertEqual(retval.status_code, 200) @@ -243,11 +173,32 @@ def test_get_strings_data(self): data = json.loads(retval.data) self.assertEqual(len(data), 1) + def test_sample_abstract(self): + """ + Sets and gets the sample abstract + """ + data = json.dumps(dict(abstract="This is a test for abstract")) + retval = self.app.post('/api/1.0/samples/1/abstract/', data=data, + content_type="application/json") + self.assertEqual(retval.status_code, 200) + result = json.loads(retval.data) + self.assertTrue(result['result']) + + retval = self.app.get('/api/1.0/samples/1/abstract/') + self.assertEqual(retval.status_code, 200) + result = json.loads(retval.data) + self.assertIn(result['abstract'], 'This is a test for abstract') + + +class ApiFamilyTests(ApiTestCase): + """ + Tests the families creation, hierarchy and manipulation + """ def test_family_creation(self): """ This will test the families creation and access """ - retval = self.create_family("TESTFAMILY1") + retval = self._create_family("TESTFAMILY1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertEqual(data['family'], 1) @@ -270,7 +221,7 @@ def test_family_tlp(self): Test the TLP level affectation for a family """ - retval = self.create_family("TESTFAMILY2", tlp_level=5) + retval = self._create_family("TESTFAMILY2", tlp_level=5) self.assertEqual(retval.status_code, 200) retval = self.app.get('/api/1.0/family/1/') @@ -283,7 +234,7 @@ def test_family_abstract(self): """ Try to update the family abstract """ - self.create_family("TESTFAMILY1") + self._create_family("TESTFAMILY1") data = json.dumps(dict(abstract="Test abstract")) retval = self.app.post("/api/1.0/family/1/abstract/", data=data, content_type="application/json") @@ -295,8 +246,11 @@ def test_family_abstract(self): self.assertIn(data["abstract"], "Test abstract") def test_subfamilies(self): - self.create_family("MOTHER FAMILY") - self.create_family("CHILD FAMILY", parent="MOTHER FAMILY") + """ + Can we manage a hierarchical family organization? + """ + self._create_family("MOTHER FAMILY") + self._create_family("CHILD FAMILY", parent="MOTHER FAMILY") retval = self.app.get('/api/1.0/family/1/') @@ -309,11 +263,222 @@ def test_subfamilies(self): data = json.loads(retval.data)["family"] self.assertEqual(data["parent_id"], 1) + +class ApiYaraTests(ApiTestCase): + """ + Yara rules creation and management + """ + def _create_yara(self, name, rule, tlp_level=None): + retval = self.app.post('/api/1.0/yaras/', + data=json.dumps(dict(name=name, + rule=rule, + tlp_level=tlp_level)), + content_type="application/json") + return retval + + def _update_yara(self, name, rule, tlp_level=None): + retval = self.app.patch('/api/1.0/yaras/', + data=json.dumps(dict(name=name, + rule=rule, + tlp_level=tlp_level)), + content_type="application/json") + return retval + + def test_yara_creation(self): + """ + Creates an example yara rule, + and try to get it back + """ + rule_text = """rule toto{ + strings: + $1 = {4D 5A} + condition: + $1 at 0 + }""" + retval = self._create_yara("TESTYARA", rule_text) + self.assertEqual(retval.status_code, 200) + data = json.loads(retval.data) + self.assertEqual(data["id"], 1) + + retval = self.app.get("/api/1.0/yaras/") + self.assertEqual(retval.status_code, 200) + data = json.loads(retval.data) + self.assertEqual(len(data['yara_rules']), 1) + rule = data['yara_rules'][0] + self.assertIn(rule['name'], "TESTYARA") + self.assertEqual(rule['TLP_sensibility'], 3) + self.assertIn(rule['raw_rule'], rule_text) + + def test_yara_tlp(self): + """ + Is the TLP correctly managed for a yara rule? + """ + rule_text = """rule toto{ + strings: + $1 = {4D 5A} + condition: + $1 at 0 + }""" + retval = self._create_yara("TESTYARA", rule_text, tlp_level=4) + retval = self.app.get("/api/1.0/yaras/") + data = json.loads(retval.data) + rule = data['yara_rules'][0] + self.assertEqual(rule['TLP_sensibility'], 4) + + def test_yara_family(self): + """ + Test for correct affectation of a yara to a family + """ + rule_text = """rule toto{ + strings: + $1 = {4D 5A} + condition: + $1 at 0 + }""" + retval = self._create_yara("TESTYARA", rule_text) + self._create_family("TESTFAMILY") + retval = self.app.post('/api/1.0/family/1/yaras/', + data=json.dumps(dict(rule_name="TESTYARA")), + content_type="application/json") + self.assertEqual(retval.status_code, 200) + data = json.loads(retval.data) + self.assertTrue(data["result"]) + + retval = self.app.get('/api/1.0/family/1/export/1/detection/yara') + self.assertEqual(retval.status_code, 200) + self.assertIn("TESTYARA", retval.data) + self.assertIn("4D 5A", retval.data) + +# def test_yara_update(self): + # rule_text = """rule toto{ + # strings: + # $1 = {4D 5A} + # condition: + # $1 at 0 + # }""" + # retval = self._create_yara("TESTYARA", rule_text, tlp_level=4) + + # # Try to update the yara + # retval = self._update_yara("TESTYARA", rule_text.replace('$1', '$MZ')) + # self.assertEqual(retval.status_code, 200) + # data = json.loads(retval.data) + # self.assertTrue(data["result"]) + + # # Next check for the changes in the resulting data + # retval= self.app.get("/api/1.0/yaras/") + # data = json.loads(retval.data) + # rule = data['yara_rules'][0] + # self.assertIn(rule["raw_rule"], rule_text.replace('$1', '$MZ')) + + +class ApiIDAActionsTests(ApiTestCase): + """ + Tests storage and synchronization for implemented IDA Pro types. + """ + def _push_comment(self, sid=1, address=None, comment=None): + retval = self.app.post('/api/1.0/samples/'+str(sid)+'/comments/', + data=json.dumps(dict(address=address, comment=comment)), + content_type="application/json") + return retval + + def _create_struct(self, sid=1, name=None): + retval = self.app.post('/api/1.0/samples/'+str(sid)+'/structs/', + data=json.dumps(dict(name=name)), + content_type="application/json") + return retval + + def _create_struct_member(self, sid=1, struct_id=None, mname=None, size=0, offset=0): + url = '/api/1.0/samples/' + str(sid) + url += '/structs/' + str(struct_id) + url += '/members/' + retval = self.app.post(url, + data=json.dumps(dict(name=mname, + size=size, + offset=offset)), + content_type="application/json") + return retval + + def _update_struct_member_name(self, sid=1, struct_id=None, mid=None, newname=""): + url = '/api/1.0/samples/' + str(sid) + url += '/structs/' + str(struct_id) + url += '/members/' + retval = self.app.patch(url, + data=json.dumps(dict(mid=mid, newname=newname)), + content_type="application/json") + return retval + + def _update_struct_member_size(self, sid=1, struct_id=None, mid=None, newsize=0): + url = '/api/1.0/samples/' + str(sid) + url += '/structs/' + str(struct_id) + url += '/members/' + retval = self.app.patch(url, + data=json.dumps(dict(mid=mid, newsize=newsize)), + content_type="application/json") + return retval + + def _get_all_structs(self, sid=1): + retval = self.app.get('/api/1.0/samples/' + str(sid) + + '/structs/') + return retval + + def _get_one_struct(self, sid=1, struct_id=1): + url = '/api/1.0/samples/' + str(sid) + url += '/structs/' + url += str(struct_id) + '/' + retval = self.app.get(url) + return retval + + def _get_comment(self, sid=1, address=None): + retval = self.app.get('/api/1.0/samples/' + str(sid) + + '/comments/', + data=json.dumps({'address':address}), + content_type="application/json") + return retval + + def _push_name(self, sid=1, address=None, name=None): + retval = self.app.post('/api/1.0/samples/'+str(sid)+'/names/', + data=json.dumps(dict(address=address, name=name)), + content_type="application/json") + return retval + + def _get_name(self, sid=1, address=None): + url = '/api/1.0/samples/' + str(sid) + '/names/' + if address is not None: + url += '?addr=' + url += hex(address) + retval = self.app.get(url) + return retval + + def _create_type(self, sid=1, address=None, typedef=None): + retval = self.app.post('/api/1.0/samples/'+str(sid)+'/types/', + data=json.dumps(dict(address=address, + typedef=typedef)), + content_type="application/json") + return retval + + def _get_type(self, sid=1, address=None): + url = '/api/1.0/samples/' + str(sid) + url += '/types/' + if address is not None: + url += "?addr=" + url += hex(address) + + return self.app.get(url) + + @staticmethod + def _format_timedelta(): + """ + wrapper for strftime and 1 day offset + """ + offset = datetime.datetime.now() + datetime.timedelta(days=1) + offset = datetime.datetime.strftime(offset, '%Y-%m-%dT%H:%M:%S.%f') + return offset + def test_push_comments(self): """ Can we push comments for a sample? """ - retval = self.push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") + retval = self._push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data['result']) @@ -322,29 +487,29 @@ def test_get_comment(self): """ This endpoint is used to get comments for a specific address """ - retval = self.push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") + retval = self._push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data['result']) - retval = self.get_comment(address=0xDEADBEEF) + retval = self._get_comment(address=0xDEADBEEF) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertIn(data['comments'][0]["data"], "TESTCOMMENT1") self.assertEqual(data['comments'][0]["address"], 0xDEADBEEF) def test_get_multiple_comments(self): - retval = self.push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") + retval = self._push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data['result']) - retval = self.push_comment(address=0xBADF00D, comment="TESTCOMMENT2") + retval = self._push_comment(address=0xBADF00D, comment="TESTCOMMENT2") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data['result']) - retval = self.get_comment() + retval = self._get_comment() self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertEqual(len(data['comments']), 2) @@ -353,21 +518,12 @@ def test_get_multiple_comments(self): self.assertIn(data['comments'][1]["data"], "TESTCOMMENT2") self.assertEqual(data['comments'][1]["address"], 0xBADF00D) - @staticmethod - def format_timedelta(): - """ - wrapper for strftime and 1 day offset - """ - offset = datetime.datetime.now() + datetime.timedelta(days=1) - offset = datetime.datetime.strftime(offset, '%Y-%m-%dT%H:%M:%S.%f') - return offset - def test_action_timestamp(self): """ Test getting different results with different timestamps """ - self.push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") - offset = self.format_timedelta() + self._push_comment(address=0xDEADBEEF, comment="TESTCOMMENT1") + offset = self._format_timedelta() retval = self.app.get('/api/1.0/samples/1/comments/?timestamp='+offset) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) @@ -383,8 +539,8 @@ def test_action_timestamp(self): data = json.loads(retval.data) self.assertEqual(len(data["comments"]), 1) - self.push_name(address=0xDEADBEEF, name="TESTNAME") - offset = self.format_timedelta() + self._push_name(address=0xDEADBEEF, name="TESTNAME") + offset = self._format_timedelta() retval = self.app.get('/api/1.0/samples/1/names/?timestamp='+offset) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) @@ -395,8 +551,8 @@ def test_action_timestamp(self): data = json.loads(retval.data) self.assertEqual(len(data["names"]), 1) - self.create_struct(name="TESTSTRUCTURE") - offset = self.format_timedelta() + self._create_struct(name="TESTSTRUCTURE") + offset = self._format_timedelta() retval = self.app.get('/api/1.0/samples/1/structs/?timestamp='+offset) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) @@ -407,12 +563,11 @@ def test_action_timestamp(self): data = json.loads(retval.data) self.assertEqual(len(data["structs"]), 1) - def test_push_name(self): """ Simulate a renaming done from IDA """ - retval = self.push_name(address=0xDEADBEEF, name="TESTNAME1") + retval = self._push_name(address=0xDEADBEEF, name="TESTNAME1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data['result']) @@ -421,12 +576,12 @@ def test_get_names(self): """ This endpoint is used to get names for a specific address """ - retval = self.push_name(address=0xDEADBEEF, name="TESTNAME1") + retval = self._push_name(address=0xDEADBEEF, name="TESTNAME1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data['result']) - retval = self.get_name(address=0xDEADBEEF) + retval = self._get_name(address=0xDEADBEEF) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertIn(data['names'][0]["data"], "TESTNAME1") @@ -436,20 +591,20 @@ def test_create_struct(self): """ Simple structure creation and access """ - retval = self.create_struct(sid=1, name="StructName1") + retval = self._create_struct(sid=1, name="StructName1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data["result"]) # check if the structure is in the complete listing - retval = self.get_all_structs(sid=1) + retval = self._get_all_structs(sid=1) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertIn("StructName1", data["structs"][0]["name"]) self.assertEqual(0, data["structs"][0]["size"]) # check if we can access the structure alone - retval = self.get_one_struct(sid=1, struct_id=1) + retval = self._get_one_struct(sid=1, struct_id=1) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) struct = data["structs"] @@ -461,17 +616,17 @@ def test_create_multiple_structs(self): This will test if we can access multiple structs for one sample """ # create structs - retval = self.create_struct(sid=1, name="StructName1") + retval = self._create_struct(sid=1, name="StructName1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data["result"]) - retval = self.create_struct(sid=1, name="StructName2") + retval = self._create_struct(sid=1, name="StructName2") data = json.loads(retval.data) self.assertTrue(data["result"]) # get the structs - retval = self.get_all_structs(sid=1) + retval = self._get_all_structs(sid=1) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertEqual(2, len(data["structs"])) @@ -482,25 +637,24 @@ def test_create_multiple_structs(self): self.assertEqual(0, struct1["size"]) self.assertEqual(0, struct2["size"]) - def test_create_struct_member(self): """ Member creation """ # first create a structre - self.create_struct(sid=1, name="StructName1") + self._create_struct(sid=1, name="StructName1") # then add a member to it - retval = self.create_struct_member(struct_id=1, - mname="MemberName1", - size=4, - offset=0) + retval = self._create_struct_member(struct_id=1, + mname="MemberName1", + size=4, + offset=0) # Is the member OK? self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data["result"]) # can we get the member in the structure - retval = self.get_one_struct(sid=1, struct_id=1) + retval = self._get_one_struct(sid=1, struct_id=1) data = json.loads(retval.data) struct = data["structs"] @@ -511,32 +665,32 @@ def test_create_struct_member(self): self.assertEqual(4, member["size"]) self.assertEqual(0, member["offset"]) - def test_create_multiple_struct_members(self): + def test_create_struct_members(self): """ Test for multiples members """ - self.create_struct(sid=1, name="StructName1") - self.create_struct_member(struct_id=1, - mname="MemberName1", - size=4, - offset=0) + self._create_struct(sid=1, name="StructName1") + self._create_struct_member(struct_id=1, + mname="MemberName1", + size=4, + offset=0) - self.create_struct_member(struct_id=1, - mname="MemberName2", - size=2, - offset=4) + self._create_struct_member(struct_id=1, + mname="MemberName2", + size=2, + offset=4) - self.create_struct_member(struct_id=1, - mname="MemberName3", - size=2, - offset=6) + self._create_struct_member(struct_id=1, + mname="MemberName3", + size=2, + offset=6) - self.create_struct_member(struct_id=1, - mname="MemberName4", - size=4, - offset=8) + self._create_struct_member(struct_id=1, + mname="MemberName4", + size=4, + offset=8) - retval = self.get_one_struct(sid=1, struct_id=1) + retval = self._get_one_struct(sid=1, struct_id=1) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) @@ -566,45 +720,83 @@ def test_create_multiple_struct_members(self): self.assertEqual(4, member["size"]) self.assertEqual(8, member["offset"]) + def test_idatypes(self): + """ + IDA Types + """ + ret = self._create_type(sid=1, address=0xDEADBEEF, typedef='void *') + self.assertEqual(ret.status_code, 200) + res = json.loads(ret.data) + self.assertTrue(res["result"]) + + # test for getting this type in all types + ret = self._get_type(sid=1) + self.assertEqual(ret.status_code, 200) + types = json.loads(ret.data)['typedefs'] + self.assertEqual(len(types), 1) + self.assertIn(types[0]["data"], 'void *') + + # test for getting type filtered by address + ret = self._get_type(sid=1, address=0xDEADBEEF) + self.assertEqual(ret.status_code, 200) + types = json.loads(ret.data)['typedefs'] + self.assertEqual(len(types), 1) + self.assertIn(types[0]["data"], 'void *') + + # test if there is no comment at a specified address + ret = self._get_type(sid=1, address=0x1234) + self.assertEqual(ret.status_code, 200) + types = json.loads(ret.data)['typedefs'] + self.assertEqual(len(types), 0) + + # test adding a new type at different address and getting it too + self._create_type(sid=1, address=0xBADF00D, typedef='int testtype(int dwTest, char cType)') + ret = self._get_type(sid=1) + self.assertEqual(ret.status_code, 200) + types = json.loads(ret.data)['typedefs'] + self.assertEqual(len(types), 2) + self.assertIn(types[0]["data"], 'void *') + self.assertIn(types[1]["data"], 'int testtype(int dwTest, char cType)') + def test_struct_member_update(self): """ Update size, name or offset """ - self.create_struct(sid=1, name="StructName1") - self.create_struct_member(struct_id=1, - mname="MemberName1", - size=4, - offset=0) + self._create_struct(sid=1, name="StructName1") + self._create_struct_member(struct_id=1, + mname="MemberName1", + size=4, + offset=0) - self.create_struct_member(struct_id=1, - mname="MemberName2", - size=2, - offset=4) + self._create_struct_member(struct_id=1, + mname="MemberName2", + size=2, + offset=4) - retval = self.update_struct_member_name(struct_id=1, - mid=1, - newname="NewMemberName1") + retval = self._update_struct_member_name(struct_id=1, + mid=1, + newname="NewMemberName1") self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data['result']) - retval = self.get_one_struct(sid=1, struct_id=1) + retval = self._get_one_struct(sid=1, struct_id=1) data = json.loads(retval.data) mstruct = data['structs'] member = mstruct['members'][0] self.assertIn('NewMemberName1', member['name']) # test when downgrading the size of first member - retval = self.update_struct_member_size(struct_id=1, - mid=1, - newsize=2) + retval = self._update_struct_member_size(struct_id=1, + mid=1, + newsize=2) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data["result"]) - retval = self.get_one_struct(sid=1, struct_id=1) + retval = self._get_one_struct(sid=1, struct_id=1) data = json.loads(retval.data) mstruct = data['structs'] member = mstruct['members'][0] @@ -612,10 +804,10 @@ def test_struct_member_update(self): self.assertEqual(mstruct['size'], 6) # test when downgrading the last member size - retval = self.update_struct_member_size(struct_id=1, - mid=2, - newsize=1) - retval = self.get_one_struct(sid=1, struct_id=1) + retval = self._update_struct_member_size(struct_id=1, + mid=2, + newsize=1) + retval = self._get_one_struct(sid=1, struct_id=1) data = json.loads(retval.data) mstruct = data['structs'] member = mstruct['members'][1] @@ -624,119 +816,26 @@ def test_struct_member_update(self): # test when upgrading the last member size - retval = self.update_struct_member_size(struct_id=1, - mid=2, - newsize=4) + retval = self._update_struct_member_size(struct_id=1, + mid=2, + newsize=4) self.assertEqual(retval.status_code, 200) data = json.loads(retval.data) self.assertTrue(data["result"]) - retval = self.get_one_struct(sid=1, struct_id=1) + retval = self._get_one_struct(sid=1, struct_id=1) data = json.loads(retval.data) mstruct = data['structs'] member = mstruct['members'][1] self.assertEqual(member['size'], 4) self.assertEqual(mstruct['size'], 8) - # if upgrading the size and overlapping the next member, # adopt the same behavior as IDA and remove the second member # TODO!!! # self.assertTrue(False) - def test_sample_abstract(self): - data = json.dumps(dict(abstract="This is a test for abstract")) - retval = self.app.post('/api/1.0/samples/1/abstract/', data=data, - content_type="application/json") - self.assertEqual(retval.status_code, 200) - result = json.loads(retval.data) - self.assertTrue(result['result']) - - retval = self.app.get('/api/1.0/samples/1/abstract/') - self.assertEqual(retval.status_code, 200) - result = json.loads(retval.data) - self.assertIn(result['abstract'], 'This is a test for abstract') - - def test_yara_creation(self): - rule_text = """rule toto{ - strings: - $1 = {4D 5A} - condition: - $1 at 0 - }""" - retval = self.create_yara("TESTYARA", rule_text) - self.assertEqual(retval.status_code, 200) - data = json.loads(retval.data) - self.assertEqual(data["id"], 1) - - retval= self.app.get("/api/1.0/yaras/") - self.assertEqual(retval.status_code, 200) - data = json.loads(retval.data) - self.assertEqual(len(data['yara_rules']), 1) - rule = data['yara_rules'][0] - self.assertIn(rule['name'], "TESTYARA") - self.assertEqual(rule['TLP_sensibility'], 3) - self.assertIn(rule['raw_rule'], rule_text) - - def test_yara_tlp(self): - rule_text = """rule toto{ - strings: - $1 = {4D 5A} - condition: - $1 at 0 - }""" - retval = self.create_yara("TESTYARA", rule_text, tlp_level=4) - retval= self.app.get("/api/1.0/yaras/") - data = json.loads(retval.data) - rule = data['yara_rules'][0] - self.assertEqual(rule['TLP_sensibility'], 4) - - def test_yara_family(self): - """ - Test for correct affectation of a yara to a family - """ - rule_text = """rule toto{ - strings: - $1 = {4D 5A} - condition: - $1 at 0 - }""" - retval = self.create_yara("TESTYARA", rule_text) - self.create_family("TESTFAMILY") - retval = self.app.post('/api/1.0/family/1/yaras/', - data=json.dumps(dict(rule_name="TESTYARA")), - content_type="application/json") - self.assertEqual(retval.status_code, 200) - data = json.loads(retval.data) - self.assertTrue(data["result"]) - - retval = self.app.get('/api/1.0/family/1/export/1/detection/yara') - self.assertEqual(retval.status_code, 200) - self.assertIn("TESTYARA", retval.data) - self.assertIn("4D 5A", retval.data) - - -# def test_yara_update(self): - # rule_text = """rule toto{ - # strings: - # $1 = {4D 5A} - # condition: - # $1 at 0 - # }""" - # retval = self.create_yara("TESTYARA", rule_text, tlp_level=4) - - # # Try to update the yara - # retval = self.update_yara("TESTYARA", rule_text.replace('$1', '$MZ')) - # self.assertEqual(retval.status_code, 200) - # data = json.loads(retval.data) - # self.assertTrue(data["result"]) - - # # Next check for the changes in the resulting data - # retval= self.app.get("/api/1.0/yaras/") - # data = json.loads(retval.data) - # rule = data['yara_rules'][0] - # self.assertIn(rule["raw_rule"], rule_text.replace('$1', '$MZ')) if __name__ == '__main__': unittest.main()