In [1]:
import pandas as pd
from datetime import datetime
from datetime import timedelta
import random
import math
import concurrent.futures
from multiprocessing import Manager

## Almost all configurable variables are kept here

In [2]:
#day of weeks
#Monday =0 .. Sunday=6
work_days=range(5)
work_hours = range(8,18)

start_working_time=8
end_working_time=17
#down period is in seconds
down_for_all_day=False
#up period is in seconds
up_period=3600

mac_list=["F6:B8:15:1E:A8:02",
            "6C:D0:7E:63:10:F1",
            "97:46:62:3B:15:2C",
            "05:A6:2E:30:98:E2"]
#one flag for each mac
plc_state=[False,False,False,False]
pls_abnormal_state=[False,False,False,False]
plc_counter=[0,0,0,0]
max_cycle_time=[41,48,34,74]


#mu is the mean
mu_cycle_time=[24,27,21,38]
mu_ram=[7464,8197,6465,9883]
mu_packets=20
mu_packets_size=50
#sigma is the standard deviation
sigma_cycle_time=4
sigma_ram=256
sigma_packets=3
sigma_packets_size=5

#from how many days ago should i start
back_in_time = 365

packets_nmap_probability=0.01
packets_big_probability=0.01
#probabilità che i sistemi siano spenti durante un giorno normale
random_down_for_all_day_probability=0.01
#probabilità che i sistemi siano accesi un ora durante un giorno di shutdown
random_up_probability=0.01

#window_check must be in seconds and 
new_day_check_time = datetime(1,1,1,0,0,0,0)

day_start_time = datetime(1,1,1,0,0,0)
day_stop_time = datetime(1,1,1,0,0,0)


## variables regarding the temporal space to generate the data

In [3]:
days_to_go_back = 365
#frequency = '1S'   #every second
frequency = '1T'    #every minute
#frequency = '1H'    #every hour

start_date = (datetime.today() - timedelta(days = days_to_go_back)).replace(hour=0,minute=0,second=0, microsecond=0)
stop_date = datetime.today().replace(hour=0,minute=0,second=0, microsecond=0)

#datelist = pd.date_range(datetime.today() - timedelta(days = back_in_time), datetime.today(), freq='S').tolist()
datelist = pd.date_range(start_date, stop_date, freq=frequency).tolist()

In [4]:
# how many rows are created
print(len(datelist))
#to check that the first line is actually spaced out from 00:00:00 the time you chose
print(datelist[1])

525601
2022-11-28 00:01:00


## Functions to simplify the code down and making easier to mantain

In [5]:
def generate_cpu_cycle_time(date, mac_index):
    return math.floor(random.gauss(mu=mu_cycle_time[mac_index],sigma=sigma_cycle_time))

In [6]:
def generate_ram_usage(date, mac_index):
    return math.floor(random.gauss(mu=mu_ram[mac_index],sigma=sigma_ram))

### ping packets are 56 bytes

In [7]:
def generate_rx_size(packet_n, date, mac_index):
    result = 0
    for i in range(packet_n):
        result += random.gauss(mu=mu_packets_size,sigma=sigma_packets_size)
    return math.floor(result)

In [8]:
def generate_tx_size(packet_n, date, mac_index):
    result = 0
    for i in range(packet_n):
        result += random.gauss(mu=mu_packets_size,sigma=sigma_packets_size)
    return math.floor(result)

## randomize the starting time with a margin of 15 minnutes, to make shure the data are not too predictable

In [9]:
#TODO: refactoring with variables and not static integers

def randomize_start_and_stop():
    global day_start_time
    global day_stop_time

    day_start_time = day_start_time.replace(hour=random.randint(7,8))
    if (day_start_time.hour == 8):
        day_start_time = day_start_time.replace(minute=random.randint(0,15))
    else:
        day_start_time = day_start_time.replace(minute=59-random.randint(0,15))

    day_stop_time = day_start_time.replace(hour=random.randint(16,17))
    if (day_stop_time.hour == 17):
        day_stop_time = day_stop_time.replace(minute=random.randint(0,15))
    else:
        day_stop_time = day_stop_time.replace(minute=59-random.randint(0,15))

In [10]:
#even if is a simple function in case someone need to extend the code of this part can be done in an easier way
def new_day_check (date):
    if( date.time() == new_day_check_time.time() ):
        randomize_start_and_stop()

In [11]:
#TODO: expand this functions to generate also errors and "attacks"

def check_plc_state(date, plc_index):
    if(date.dayofweek in work_days and day_start_time.time() <= date.time() <= day_stop_time.time()):
        plc_state[plc_index]=True
    else:
        plc_state[plc_index]=False

In [12]:
#TODO: make the function right and add in the upper cell
def check_plc_status_old(date, plc_index):

    if(date.dayofweek in work_days):
        if (plc_state[plc_index]==False):
            if (plc_counter[plc_index]>=up_period):
                plc_state[plc_index]=True
            plc_counter += 1

        elif (random.random()<=random_up_probability):
            plc_state[plc_index]=False

    elif (plc_state[plc_index]==True):
        if (plc_counter[plc_index]>=down_for_all_day):
            plc_state[plc_index]=False
        plc_counter += 1
    
    elif (random.random()<=random_down_for_all_day_probability):
            plc_state[plc_index]=True
            plc_counter += 1

In [13]:
#columns names
columns =["timestamp","mac_address","cpu_max_cycle","cpu_current_cycle","ram_usage","rx_packets","rx_bytes","tx_packets","tx_bytes","flag"]

## REAL process function that generate the data
This function get a date, devide it with a frequency that can be 10 or 1 second (manual mod), generate the rows of each subdivision, append all of them in a list, and then return the result.

In [14]:
def process_data(date):

    sublist = pd.date_range(date, date + timedelta(seconds=59), freq='5S').tolist()

    result = []
    
    for subdate in sublist:
        new_day_check(subdate)
        for plc in range(0,len(mac_list)):
            check_plc_state(subdate, plc)

            if plc_state[plc]:
                cpu_time = generate_cpu_cycle_time(subdate, plc)
                ram_usage = generate_ram_usage(subdate, plc)
                tx_packets = math.floor(random.gauss(mu_packets, sigma_packets))
                tx_bytes = generate_tx_size(tx_packets, subdate, plc)
                rx_packets = math.floor(random.gauss(mu_packets, sigma_packets))
                rx_bytes = generate_rx_size(rx_packets, subdate, plc)
                new_row = {'timestamp': subdate,
                        'mac_address': mac_list[plc],
                        'cpu_max_cycle': max_cycle_time[plc],
                        'cpu_current_cycle': cpu_time,
                        'ram_usage': ram_usage,
                        'rx_packets': rx_packets,
                        'rx_bytes': rx_bytes,
                        'tx_packets': tx_packets,
                        'tx_bytes': tx_bytes,
                        'flag': "ok"}
                result.append(new_row)
    return result


## Parallelising part
Here we see 3 different ways to make it parallel; the second way is the fastest, but boy if nothing wanted to work at all :')
N.B. this was my first attempt to make a python program parallel

In [15]:
# Use a ThreadPoolExecutor for parallel processing
max_thread_workers = 8
max_process_workers = 8

# 0=all, 1=first method, 2=second method, 3 = third method
algorithm=2

#### Faster than single core but a lot slower than the other 2
It isn't really multy processor, so the speedup exist (quite a lot) but not really enough to make it viable
~~ 37s for 31 days generated

In [16]:
if ( algorithm == 0 or algorithm == 1):
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_thread_workers) as executor:
        # Process dates in parallel
        results1 = list(executor.map(process_data, datelist))

    # Concatenate the results into a single DataFrame
    df1 = pd.concat([pd.DataFrame(result) for result in results1], ignore_index=True)

#### Fastest!
~~ 10s for 31 days generated!!!

In [17]:
if ( algorithm == 0 or algorithm == 2):
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_process_workers) as executor:
        futures2 = [executor.submit(process_data, date) for date in datelist]
        
    # Wait for all tasks to complete
    results2 = [future.result() for future in concurrent.futures.as_completed(futures2)]
    # Flatten the list of lists into a single list of dictionaries
    flat_results = [item for sublist in results2 for item in sublist]
    # Create a DataFrame from the results
    df2 = pd.DataFrame(flat_results)


#### possible bottleneck in the way of recollecting the data and merging them 
This is pure speculation based on watching htop, after the full core load a single core burst to 100% (reconciliation maybe) and then goes back to all core load.
is impossible to reduce this overhead increasing the load, like starting not only 8 thread (1 per core) but double it.
~~ 27s for 31 days generated

In [18]:
if ( algorithm == 0 or algorithm == 3):
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_process_workers) as executor:
        # Process dates in parallel
        futures3 = [executor.submit(process_data, date) for date in datelist]
        # Wait for all tasks to complete
        concurrent.futures.wait(futures3)
    # Collect results from completed tasks
    results3 = [future.result() for future in futures3]
    # Concatenate the results into a single DataFrame
    df3 = pd.concat([pd.DataFrame(result) for result in results3], ignore_index=True)


In [19]:
if ( algorithm == 0 or algorithm == 1):
    df1.set_index('timestamp', inplace=True)
    print(df1.shape)
    df1.head()

In [20]:
if ( algorithm == 0 or algorithm == 2):
    df2.set_index('timestamp', inplace=True)
    print(df2.shape)
    df2.head()

(6350748, 9)


In [21]:
if ( algorithm == 0 or algorithm == 3):
    df3.set_index('timestamp', inplace=True)
    print(df3.shape)
    df3.head()

In [22]:
if (algorithm == 1):
    df1.to_csv("data.csv")
elif (algorithm == 2):
    df2.to_csv("data.csv")
elif (algorithm == 3):
    df3.to_csv("data.csv")