In [10]:
import unittest
from src.log_tagger import LogTagger  # Assuming log_tagger.py is in src folder
import os

class TestLogTagger(unittest.TestCase):

    def setUp(self):
        """Set up any test files or variables needed."""
        self.lookup_filename = 'lookup.txt'
        self.logs_filename = 'sampleFlowLogs.txt'
        self.output_filename = 'output_test'

        # Create a minimal lookup table
        with open(self.lookup_filename, 'w') as f:
            f.write("dstport,protocol,tag\n")
            f.write("80,tcp,HTTP\n")
            f.write("443,tcp,HTTPS\n")
            f.write("53,udp,DNS\n")

        # Create a minimal flow log file
        with open(self.logs_filename, 'w') as f:
            f.write("2 111122223333 eni-12345678 10.0.0.1 10.0.0.2 12345 80 6 10 100 1623023627 1623023687 ACCEPT OK\n")
            f.write("2 111122223333 eni-12345678 10.0.0.3 10.0.0.4 12345 443 6 10 100 1623023627 1623023687 ACCEPT OK\n")
            f.write("2 111122223333 eni-12345678 10.0.0.5 10.0.0.6 12345 53 17 10 100 1623023627 1623023687 ACCEPT OK\n")

    def tearDown(self):
        """Clean up files created during the test."""
        if os.path.exists(self.lookup_filename):
            os.remove(self.lookup_filename)
        if os.path.exists(self.logs_filename):
            os.remove(self.logs_filename)
        for ext in ['.csv', '_tag_counts.csv', '_port_protocol_counts.csv']:
            if os.path.exists(self.output_filename + ext):
                os.remove(self.output_filename + ext)

    def test_lookup_file_not_found(self):
        """Test handling of missing lookup file."""
        log_tagger = LogTagger('nonexistent_lookup.txt')
        self.assertEqual(log_tagger.lookup, {}, "Lookup table should be empty if file not found")

    def test_logs_file_not_found(self):
        """Test handling of missing logs file."""
        log_tagger = LogTagger(self.lookup_filename)
        with self.assertLogs(level='ERROR'):
            log_tagger.apply_tags_to_logs('nonexistent_logs.txt', self.output_filename)
        self.assertFalse(os.path.exists(self.output_filename + '.csv'), "Output file should not exist")

    def test_valid_logs_processing(self):
        """Test processing with valid logs and lookup table."""
        log_tagger = LogTagger(self.lookup_filename)
        log_tagger.apply_tags_to_logs(self.logs_filename, self.output_filename)

        # Verify the output file was created
        self.assertTrue(os.path.exists(self.output_filename + '.csv'), "Output file should exist")

        # Verify tag counts and port/protocol counts
        with open(self.output_filename + '_tag_counts.csv', 'r') as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 4)  # Header + 3 tags

        with open(self.output_filename + '_port_protocol_counts.csv', 'r') as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 4)  # Header + 3 port/protocol combinations

    def test_unrecognized_protocol(self):
        """Test handling of unrecognized protocol in log entry."""
        log_tagger = LogTagger(self.lookup_filename)
        with open(self.logs_filename, 'a') as f:
            f.write("2 111122223333 eni-12345678 10.0.0.7 10.0.0.8 12345 9999 9999 10 100 1623023627 1623023687 ACCEPT OK\n")
        
        log_tagger.apply_tags_to_logs(self.logs_filename, self.output_filename)
        
        with open(self.output_filename + '_tag_counts.csv', 'r') as f:
            lines = f.readlines()
            self.assertTrue(any('Untagged' in line for line in lines), "Log entry with unknown protocol should be tagged as 'Untagged'")

if __name__ == '__main__':
    unittest.main()

ModuleNotFoundError: No module named 'src'

In [11]:
X

NameError: name 'X' is not defined