##### System Libraries

In [1]:
import os
import subprocess
import logging
import re
import copy

##### Required Libraries

In [2]:
try:
    import scapy
except ModuleNotFoundError:
    os.system('pip install scapy')
    import scapy
try:
    import tqdm
except ModuleNotFoundError:
    os.system('pip install tqdm')
    import tqdm
from scapy.all import rdpcap
from tqdm import trange,tqdm

### Analyzer

#### Minor Function Breakdowns
---
##### Generate_Logger
Generates a logger with default parameters for logging critical components and debugging
##### Generate Self IPs
Generates a list of IPs assigned to current device according to a protocol list.
##### List Initializer [V2]
Helper function to shorten and simplify initializing and re-initializing lists with values
##### PCAP_Input
Takes a pcap file as input and returns and stores it in memory
##### Generate Unique IPs
Generates a list of unique IPs found in the PCAP data (excluding self IPs)

In [3]:
import scapy.plist


class Analyzer:
    if True:
        #Initialization of the logger
        logger=logging.getLogger
        stored_logger_level=[logging.DEBUG]

        stored_protocol_list=["IPv4","IPv6","Gateway"]

        stored_pcap_data=[]

        stored_self_ips=[]
        stored_unique_ips=[]
        stored_blocked_ips=[]

    def __init__(self,path,protocol_list=stored_protocol_list):
        if True:
            self.logger=self.Generate_Logger(filename='AnalysisCore.log')
            self.Generate_Self_IPs(protocol_list=protocol_list)
            self.PCAP_Input(path)
            self.Generate_Unique_IPs()
    def Generate_Logger(self,filename='default_logger_name.log'): 
        logging.basicConfig(filename=filename,format='%(asctime)s - %(levelname)s: %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S',filemode='w',level=self.stored_logger_level[0])
        logger=logging.getLogger()
        logger.debug('Logger initialized.')
        return logger
    def List_Initializer(self,input_list,input_param):
        if isinstance(input_param,list) or isinstance(input_param,scapy.plist.PacketList):
            input_list.clear()
            for item in input_param:
                input_list.append(item)
        else:
            input_list.clear()
            input_list.append(input_param)
    def Generate_Self_IPs(self,protocol_list=stored_protocol_list):
        self.logger.info("Generating self IPs with protocols, "+str(protocol_list))
        self_ips=[]
        process=subprocess.Popen(['ipconfig'],stdout=subprocess.PIPE,text=True,shell=True)
        for line in process.stdout:
            for protocol in protocol_list:
                if re.search(protocol,line.strip()):
                    self_ips.append(line.strip()[line.strip().find(':')+2:])
        self.List_Initializer(self.stored_self_ips,self_ips)
        self.logger.debug('Stored self IPs in stored_self_ips '+str(self_ips))
        return self_ips
    def PCAP_Input(self,path):
        self.logger.info('Reading file from path, '+path)
        print('Reading PCAP Data.')
        pcap_data=rdpcap(path)
        self.logger.debug('Stored '+path+' data in stored_pcap_data.')
        self.List_Initializer(self.stored_pcap_data,pcap_data)
        return pcap_data
    def Generate_Unique_IPs(self,pcap_data=stored_pcap_data,self_ips=stored_self_ips):
        self.logger.info('Extracting Unique IPs from PCAP Data.')
        unique_ips=[]
        blocked_ips=self_ips.copy()
        self.logger.debug(str(self_ips.copy()))
        packets=pcap_data
        for packet in tqdm(packets,desc='Analyzing packets for unique IPs'):
            flag=0
            try:
                for ip in blocked_ips:
                    if packet.payload.dst==ip:
                        flag=1
                if flag==0:
                    blocked_ips.append(packet.payload.dst)
                    unique_ips.append(packet.payload.dst)
            except AttributeError:
                broadcast_temp='256.256.256.256'
                for ip in blocked_ips:
                    if ip==broadcast_temp:
                        flag=1
                if flag==0:
                    blocked_ips.append(broadcast_temp)
                    unique_ips.append(broadcast_temp)
        self.List_Initializer(self.stored_unique_ips,unique_ips)
        self.logger.debug('Stored unique IPs in stored_unique_ips.')
        return unique_ips

In [7]:
A=Analyzer('PCAP Data/720S-Down-Spo-Misc-Idling.pcap')

Reading PCAP Data.


Analyzing packets for unique IPs: 100%|██████████| 10839/10839 [00:00<00:00, 27490.83it/s]


In [8]:
len(A.stored_unique_ips)

64