Skip to content

Commit

Permalink
chg: [modules + tests] fix modules + test modules on samples
Browse files Browse the repository at this point in the history
  • Loading branch information
Terrtia committed Jun 8, 2021
1 parent 90b6f43 commit 42a23da
Show file tree
Hide file tree
Showing 16 changed files with 69 additions and 41 deletions.
7 changes: 3 additions & 4 deletions bin/ModuleStats.py
Expand Up @@ -12,15 +12,14 @@
import datetime
import redis
import os
import sys


sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages #
##################################
from module.abstract_module import AbstractModule
from modules.abstract_module import AbstractModule
from packages.Date import Date
from pubsublogger import publisher
from Helper import Process
from packages import Paste
import ConfigLoader

Expand Down
10 changes: 6 additions & 4 deletions bin/core/DbCleaner.py
Expand Up @@ -10,14 +10,16 @@
import time
import datetime

from pubsublogger import publisher

import NotificationHelper

sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from packages import Date
from packages import Item
from packages import Term

from pubsublogger import publisher

def clean_term_db_stat_token():
all_stat_date = Term.get_all_token_stat_history()

Expand Down
4 changes: 2 additions & 2 deletions bin/modules/Categ.py
Expand Up @@ -51,7 +51,7 @@ class Categ(AbstractModule):
Categ module for AIL framework
"""

def __init__(self, categ_files_dir='../files/'):
def __init__(self, categ_files_dir=os.path.join(os.environ['AIL_HOME'], 'files')):
"""
Init Categ
"""
Expand Down Expand Up @@ -107,7 +107,7 @@ def compute(self, message, r_result=False):
# SCRIPT PARSER #
parser = argparse.ArgumentParser(description='Start Categ module on files.')
parser.add_argument(
'-d', type=str, default="../files/",
'-d', type=str, default=os.path.join(os.environ['AIL_HOME'], 'files'),
help='Path to the directory containing the category files.',
action='store')
args = parser.parse_args()
Expand Down
8 changes: 8 additions & 0 deletions bin/modules/Global.py
Expand Up @@ -104,6 +104,7 @@ def compute(self, message, r_result=False):
# Incorrect filename
if not os.path.commonprefix([filename, self.PASTES_FOLDER]) == self.PASTES_FOLDER:
self.redis_logger.warning(f'Global; Path traversal detected {filename}')
print(f'Global; Path traversal detected {filename}')

else:
# Decode compressed base64
Expand Down Expand Up @@ -134,6 +135,7 @@ def compute(self, message, r_result=False):

else:
self.redis_logger.debug(f"Empty Item: {message} not processed")
print(f"Empty Item: {message} not processed")


def check_filename(self, filename, new_file_content):
Expand All @@ -145,6 +147,7 @@ def check_filename(self, filename, new_file_content):
# check if file exist
if os.path.isfile(filename):
self.redis_logger.warning(f'File already exist {filename}')
print(f'File already exist {filename}')

# Check that file already exists but content differs
curr_file_content = self.gunzip_file(filename)
Expand All @@ -165,11 +168,13 @@ def check_filename(self, filename, new_file_content):
if os.path.isfile(filename):
# Ignore duplicate
self.redis_logger.debug(f'ignore duplicated file {filename}')
print(f'ignore duplicated file {filename}')
filename = None

else:
# Ignore duplicate checksum equals
self.redis_logger.debug(f'ignore duplicated file {filename}')
print(f'ignore duplicated file {filename}')
filename = None

else:
Expand All @@ -192,10 +197,12 @@ def gunzip_file(self, filename):
curr_file_content = f.read()
except EOFError:
self.redis_logger.warning(f'Global; Incomplete file: {filename}')
print(f'Global; Incomplete file: {filename}')
# save daily stats
self.r_stats.zincrby('module:Global:incomplete_file', datetime.datetime.now().strftime('%Y%m%d'), 1)
except OSError:
self.redis_logger.warning(f'Global; Not a gzipped file: {filename}')
print(f'Global; Not a gzipped file: {filename}')
# save daily stats
self.r_stats.zincrby('module:Global:invalid_file', datetime.datetime.now().strftime('%Y%m%d'), 1)

Expand All @@ -213,6 +220,7 @@ def gunzip_bytes_obj(self, bytes_obj):
gunzipped_bytes_obj = fo.read()
except Exception as e:
self.redis_logger.warning(f'Global; Invalid Gzip file: {filename}, {e}')
print(f'Global; Invalid Gzip file: {filename}, {e}')

return gunzipped_bytes_obj

Expand Down
26 changes: 15 additions & 11 deletions bin/modules/Indexer.py
Expand Up @@ -26,7 +26,7 @@
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from packages import Paste
from packages.Item import Item


class Indexer(AbstractModule):
Expand Down Expand Up @@ -98,19 +98,23 @@ def __init__(self):


def compute(self, message):
try:
PST = Paste.Paste(message)
docpath = message.split(" ", -1)[-1]
paste = PST.get_p_content()
self.redis_logger.debug(f"Indexing - {self.indexname}: {docpath}")
print(f"Indexing - {self.indexname}: {docpath}")
docpath = message.split(" ", -1)[-1]

item = Item(message)
item_id = item.get_id()
item_content = item.get_content()

self.redis_logger.debug(f"Indexing - {self.indexname}: {docpath}")
print(f"Indexing - {self.indexname}: {docpath}")

try:
# Avoid calculating the index's size at each message
if(time.time() - self.last_refresh > self.TIME_WAIT):
self.last_refresh = time.time()
if self.check_index_size() >= self.INDEX_SIZE_THRESHOLD*(1000*1000):
timestamp = int(time.time())
self.redis_logger.debug(f"Creating new index {timestamp}")
print(f"Creating new index {timestamp}")
self.indexpath = join(self.baseindexpath, str(timestamp))
self.indexname = str(timestamp)
# update all_index
Expand All @@ -125,13 +129,13 @@ def compute(self, message):
indexwriter.update_document(
title=docpath,
path=docpath,
content=paste)
content=item_content)
indexwriter.commit()

except IOError:
self.redis_logger.debug(f"CRC Checksum Failed on: {PST.p_path}")
self.redis_logger.error('Duplicate;{};{};{};CRC Checksum Failed'.format(
PST.p_source, PST.p_date, PST.p_name))
self.redis_logger.debug(f"CRC Checksum Failed on: {item_id}")
print(f"CRC Checksum Failed on: {item_id}")
self.redis_logger.error(f'Duplicate;{item.get_source()};{item.get_date()};{item.get_basename()};CRC Checksum Failed')

def check_index_size(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion bin/modules/SentimentAnalysis.py
Expand Up @@ -123,7 +123,7 @@ def analyse(self, message):
avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0}
neg_line = 0
pos_line = 0
sid = SentimentIntensityAnalyzer(sentiment_lexicon_file)
sid = SentimentIntensityAnalyzer(self.sentiment_lexicon_file)
for sentence in sentences:
ss = sid.polarity_scores(sentence)
for k in sorted(ss):
Expand Down
3 changes: 2 additions & 1 deletion bin/modules/Tags.py
Expand Up @@ -45,10 +45,11 @@ def compute(self, message):
if len(mess_split) == 2:
tag = mess_split[0]
item = Item(mess_split[1])
item_id = item.get_id()

# Create a new tag
Tag.add_tag('item', tag, item.get_id())
print(f'{item.get_id(): Tagged {tag}}')
print(f'{item_id}: Tagged {tag}')

# Forward message to channel
self.send_message_to_queue(message, 'MISP_The_Hive_feeder')
Expand Down
20 changes: 10 additions & 10 deletions bin/template.py
Expand Up @@ -5,21 +5,21 @@
======================
This module is a template for Template for new modules
"""

##################################
# Import External packages
##################################
import os
import sys
import time
from pubsublogger import publisher


sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from module.abstract_module import AbstractModule
from Helper import Process
from modules.abstract_module import AbstractModule


class Template(AbstractModule):
Expand All @@ -30,12 +30,12 @@ class Template(AbstractModule):
def __init__(self):
super(Template, self).__init__()

# Send module state to logs
self.redis_logger.info("Module %s initialized"%(self.module_name))

# Pending time between two computation in seconds
# Pending time between two computation (computeNone) in seconds
self.pending_seconds = 10

# Send module state to logs
self.redis_logger.info(f'Module {self.module_name} initialized')


def computeNone(self):
"""
Expand All @@ -52,6 +52,6 @@ def compute(self, message):


if __name__ == '__main__':

module = Template()
module.run()
2 changes: 1 addition & 1 deletion bin/trackers/Tracker_Term.py
Expand Up @@ -20,7 +20,7 @@
##################################
from modules.abstract_module import AbstractModule
import NotificationHelper
from packages import Item
from packages.Item import Item
from packages import Term
from lib import Tracker

Expand Down
Binary file added samples/2021/01/01/api_keys.gz
Binary file not shown.
Binary file added samples/2021/01/01/categ.gz
Binary file not shown.
Binary file added samples/2021/01/01/credit_cards.gz
Binary file not shown.
Binary file added samples/2021/01/01/domain_classifier.gz
Binary file not shown.
Binary file added samples/2021/01/01/keys.gz
Binary file not shown.
26 changes: 20 additions & 6 deletions tests/test_modules.py
Expand Up @@ -7,6 +7,7 @@

import gzip
from base64 import b64encode
from distutils.dir_util import copy_tree

sys.path.append(os.environ['AIL_BIN'])

Expand All @@ -20,9 +21,20 @@
from modules.Onion import Onion

# project packages
from lib.ConfigLoader import ConfigLoader
import lib.crawlers as crawlers
import packages.Item as Item

#### COPY SAMPLES ####
config_loader = ConfigLoader()
# # TODO:move me in new Item package
ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/'
ITEMS_FOLDER = os.path.join(os.path.realpath(ITEMS_FOLDER), '')
TESTS_ITEMS_FOLDER = os.path.join(ITEMS_FOLDER, 'tests')
sample_dir = os.path.join(os.environ['AIL_HOME'], 'samples')
copy_tree(sample_dir, TESTS_ITEMS_FOLDER)
#### ---- ####

class Test_Module_ApiKey(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -91,29 +103,31 @@ def test_module(self):

item_content = b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'
item_content_1 = b64encode(gzip.compress(item_content)).decode()
item_content_2 = b64encode(gzip.compress(item_content + b' more text')).decode()
item_content_2 = b64encode(gzip.compress(item_content + b' more text ...')).decode()
message = f'{item_id} {item_content_1}'

# Test new item
result = self.module_obj.compute(message, r_result=True)
print(result)
print(f'test new item: {result}')
self.assertEqual(result, item_id)

# Test duplicate
result = self.module_obj.compute(message, r_result=True)
print(result)
print(f'test duplicate {result}')
self.assertIsNone(result)

# Test same id with != content
item = Item.Item('tests/2021/01/01/global_831875da824fc86ab5cc0e835755b520.gz')
item.delete()
message = f'{item_id} {item_content_2}'
result = self.module_obj.compute(message, r_result=True)
print(result)
print(f'test same id with != content: {result}')
self.assertIn(item_id[:-3], result)
self.assertNotEqual(result, item_id)

# cleanup
item = Item.Item(result)
item.delete()
# item = Item.Item(result)
# item.delete()
# # TODO: remove from queue

class Test_Module_Keys(unittest.TestCase):
Expand Down
2 changes: 1 addition & 1 deletion update/v3.4/Update_domain.py
Expand Up @@ -31,7 +31,7 @@ def update_domain_language(domain_obj, item_id):
r_serv_onion = config_loader.get_redis_conn("ARDB_Onion")
config_loader = None

r_serv.set('ail:current_background_script', 'domain languages update')
r_serv_db.set('ail:current_background_script', 'domain languages update')

nb_elem_to_update = r_serv_db.get('update:nb_elem_to_convert')
if not nb_elem_to_update:
Expand Down

0 comments on commit 42a23da

Please sign in to comment.