## Our problem

In this article we are going to use [psutil](https://psutil.readthedocs.io/en/latest/) and [gputil](https://github.com/anderskm/gputil) to monitor our system usage and then send slack notifications based on simple rules we create

In some cases you may want to look at real time monitoring statistics but for the purposes of this demonstration we will just use historical monitoring data with the following rules for slack notifications:

    Rules:
    * average CPU or memory utilization over past 36 hours is greater than 90% send red alert
    * average CPU or memory utilization over past 36 hours is between 80 and 90% send orange alert

## Getting and Storing the data

First we'll import all the libraries we need

In [1]:
import psutil
from datetime import datetime
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import time
import os
import os.path
import csv 

This cell block illustrates the tabular format that we are going to store our data into parquet files later. 

In [2]:
timestamp = datetime.now()
cpu_percent = psutil.cpu_percent(interval=0)
memory_percent = psutil.virtual_memory()[2]
disk_percent = psutil.disk_usage('/')[3]

df = pd.DataFrame({'timestamp':[timestamp],
                   'cpu_percent': [cpu_percent],
                   'memory_percent': [memory_percent],
                   'disk_percent': [disk_percent]})

df.head()

#df.to_csv('example.parquet', engine='auto', compression='snappy')
    

Unnamed: 0,cpu_percent,disk_percent,memory_percent,timestamp
0,29.7,98.0,22.8,2018-05-15 17:02:47.540188


Let's generate the file structure for our parquet files now which will follow the format of a Parquet dataset that goes down to year directories to month directories and then each day is a separate parquet file. In our script we will be appending to a csv file that gets converted to a parquet file at the end of every day

In [3]:
#define some basic functions
def root_path():
    """
    OS independent way to return root path
    """
    return os.path.abspath(os.sep)

    

def create_parquet_directory(dataset_name,base_dir):
    
    """
    generates base folder of Parquet dataset
    """
    
    if not os.path.exists(os.path.join(base_dir,dataset_name)):
        os.makedirs(os.path.join(base_dir,dataset_name))
        
def create_csv_dump_dir(dataset_name,base_dir):
    
    """
    generates base folder for csv dump file
    """
    
    if not os.path.exists(os.path.join(base_dir,dataset_name)):
        os.makedirs(os.path.join(base_dir,dataset_name))


Here we just set some global variables and call these functions we just made to create parquet dataset and csv dump directories

In [4]:
PARQUET_DATASET = 'Monitoring'
CSV_DUMP_DIR = 'csv_dump'
BASE_DIR = root_path()
CSV_NAME = 'daily_dump.csv'
CSV_FILENAME = BASE_DIR + CSV_DUMP_DIR + '\\'+CSV_NAME

#create directories if they dont exist
create_parquet_directory(PARQUET_DATASET,BASE_DIR)
create_csv_dump_dir(CSV_DUMP_DIR,BASE_DIR)

When we are actually writing out the data, we'll use the csv DictWriter to write out since that's easier for writing out single rows of data at a time. So here I jsut show how the data will be structured for writing out to the daily dump csv file

In [5]:
timestamp = datetime.now()
cpu_percent = psutil.cpu_percent(interval=0)
memory_percent = psutil.virtual_memory()[2]
disk_percent = psutil.disk_usage('/')[3]

dic = {'timestamp':timestamp,
 'cpu_percent': cpu_percent,
 'memory_percent': memory_percent,
 'disk_percent': disk_percent}

dic

{'cpu_percent': 24.6,
 'disk_percent': 98.0,
 'memory_percent': 22.8,
 'timestamp': datetime.datetime(2018, 5, 15, 17, 2, 53, 37847)}

For testing we can create some sample rows of data in a test csv file to make sure everything is working. The cell below should create the csv file and write out to it the current monitoring stats we got from above

In [17]:
file_exists = os.path.isfile(CSV_FILENAME)

with open(CSV_FILENAME, 'a') as csvfile:
    headers = ['timestamp', 'cpu_percent', 'memory_percent','disk_percent']
    writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n',fieldnames=headers)

    if not file_exists:
        writer.writeheader()  # file doesn't exist yet, write a header

    writer.writerow({'timestamp': dic['timestamp'], 'cpu_percent': dic['cpu_percent'], 
                     'memory_percent': dic['memory_percent'], 'disk_percent': dic['disk_percent']})

This function I've created below will take the daily dump csv file and convert it to a csv file. We call the function at the bottom of this cell block to check that it's working as intended

In [30]:
def daily_csv_to_parquet(timestamp,base_dir,parquet_dataset,return_filepath=False):
    
    """
    function that gets triggered daily to dump previous day's csv to 
    a parquet file
    
    Args:
        timestamp: timestamp taken when process triggered
        base_dir: base directory of dataset
        parquet_dataset: parquet dataset name
        return_filepath: can be set to true to return filepath of file that was created
    """
    #get year month and day
    year = str(timestamp.year)
    month = str(timestamp.month)
    day = str(timestamp.day)
    
    #get directory path 
    year_path = base_dir + parquet_dataset + '\\' + 'Year='+year
    month_path = year_path + '\\'+'Month='+month
    day_path = month_path+'\\'+day+'.parquet'
    
    #create directories for month and year if they dont exist
    if not os.path.exists(year_path):
        os.makedirs(year_path)
        
    if not os.path.exists(month_path):
        os.makedirs(month_path)
    
    #convert csv file to parquet file
    csv_dump_df = pd.read_csv(CSV_FILENAME)
    table = pa.Table.from_pandas(csv_dump_df)
    pq.write_table(table, day_path,compression ='snappy')
    
    if return_filepath:
        return day_path
    
filepath = daily_csv_to_parquet(timestamp,BASE_DIR,PARQUET_DATASET,return_filepath=True)
filepath

'C:\\Monitoring\\Year=2018\\Month=5\\15.parquet'

For our final check, we can try to read in the parquet file we just converted

In [31]:
table = pq.read_table(filepath)
table.to_pandas()

Unnamed: 0,timestamp,cpu_percent,memory_percent,disk_percent
0,2018-05-15 17:02:53.037847,24.6,22.8,98.0
1,2018-05-15 17:02:53.037847,24.6,22.8,98.0
2,2018-05-15 17:02:53.037847,24.6,22.8,98.0
3,2018-05-15 17:02:53.037847,24.6,22.8,98.0
4,2018-05-15 17:02:53.037847,24.6,22.8,98.0
5,2018-05-15 17:02:53.037847,24.6,22.8,98.0
6,2018-05-15 17:02:53.037847,24.6,22.8,98.0
7,2018-05-15 17:02:53.037847,24.6,22.8,98.0
8,2018-05-15 17:06:43.541909,14.5,22.6,98.0
9,2018-05-15 17:06:45.544619,6.6,22.6,98.0


This is our final loop below, if you run this it will continually run and collect the usage metrics on your PC, dumping them to the csv file during the day, and then at the end of each day, convert the csv file to a parquet file and delete the csv file.

You could have this code in a script use a scheduler to set how often you want the process to trigger or set the process to run and control how often it triggers by using time.sleep. Of course I'd recommend the scheduler method but for demonstration purposes I'm just using sleep here. 

In [28]:
try:
    while True:
        #set how often you want this to run
        time.sleep(2)
        #get current timestamp
        timestamp = datetime.now()
        time_str = timestamp.strftime("%Y-%m-%d %H:%M")
        cpu_percent = psutil.cpu_percent(interval=0)
        memory_percent = psutil.virtual_memory()[2]
        disk_percent = psutil.disk_usage('/')[3]

        dic = {'timestamp':timestamp,
         'cpu_percent': cpu_percent,
         'memory_percent': memory_percent,
         'disk_percent': disk_percent}

        file_exists = os.path.isfile(CSV_FILENAME)

        with open(CSV_FILENAME, 'a') as csvfile:
            headers = ['timestamp', 'cpu_percent', 'memory_percent','disk_percent']
            writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n',fieldnames=headers)

            if not file_exists:
                writer.writeheader()  # file doesn't exist yet, write a header

            writer.writerow({'timestamp': dic['timestamp'], 'cpu_percent': dic['cpu_percent'], 
                             'memory_percent': dic['memory_percent'], 'disk_percent': dic['disk_percent']})
            
        #check if it's next day, if so dump current file to parquet file and reset csv file
        if time_str[-5:] == '00:00':
            #convert day's csv file to parquet
            daily_csv_to_parquet(timestamp,BASE_DIR,PARQUET_DATASET)
            #remove daily dump file so new day file starts
            os.remove(CSV_FILENAME)
        
except KeyboardInterrupt:
    print('interrupted!')

PermissionError: [Errno 13] Permission denied: 'C:\\csv_dump\\daily_dump.csv'

## Reading in and Summarizing the data

So let's assume now we have many days worth of monitoring data to look at and we want to create a script to read in this data, analyze it, and post slack notifications about consistently high usage

We can read in our entire parquet dataset very easily and convert it to a pandas dataframe, this is shown below

In [48]:
#read in parquet dataset
dataset = pq.ParquetDataset(BASE_DIR+PARQUET_DATASET)
dataset_table = dataset.read(nthreads=4)
full_df = dataset_table.to_pandas()
#convert timestamp to datetime index
full_df['timestamp'] = pd.to_datetime(full_df.timestamp)
full_df = full_df.set_index('timestamp')
full_df.head()

Unnamed: 0_level_0,cpu_percent,memory_percent,disk_percent,Year,Month
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2018-05-15 17:02:53.037847,24.6,22.8,98.0,2018,5
2018-05-15 17:02:53.037847,24.6,22.8,98.0,2018,5
2018-05-15 17:02:53.037847,24.6,22.8,98.0,2018,5
2018-05-15 17:02:53.037847,24.6,22.8,98.0,2018,5
2018-05-15 17:02:53.037847,24.6,22.8,98.0,2018,5


Now whatever level of granularity we want our time in, we can resample our dataframe to get that. In this case we will resample our dataframe by the hour and use the average of our values as the aggregation metric

In [62]:
full_dataset_df = full_df.resample('H').mean()
full_dataset_df.head()

Unnamed: 0_level_0,cpu_percent,memory_percent,disk_percent
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2018-05-15 17:00:00,21.875,22.7125,98.0


let's recap the logic we need to create:

    Rules:
    * average CPU or memory utilization over past 36 hours is greater than 90% send red alert
    * average CPU or memory utilization over past 36 hours is between 80 and 90% send orange alert
    
So now that we have the average of our utilization metrics by the hour which is the data format we need in this case, we can implement our logic and post the slack notification accordingly right?

But wait, we are forgetting something. Because of the way we designed our daily csv dump logic, it only gets converted to a parquet file at the end of every day. So if we want the most up-to-date data (for ex. past hour data), we will need to read in the csv file as well and merge it with this parquet data for the historical data past 24 hour window

In [66]:
cur_day_df = pd.read_csv(CSV_FILENAME)
cur_day_df['timestamp'] = pd.to_datetime(cur_day_df.timestamp)
cur_day_df = cur_day_df.set_index('timestamp')
cur_day_df = cur_day_df.resample('H').mean()
cur_day_df.head()

Unnamed: 0_level_0,cpu_percent,memory_percent,disk_percent
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2018-05-15 17:00:00,21.875,22.7125,98.0


Now we can just combine our historical parquet file dataset with the current day csv file to get all of our data in one pandas dataframe.

However, based on our rules we onl

In [67]:
final_df = full_dataset_df.append(cur_day_df)
final_df

Unnamed: 0_level_0,cpu_percent,memory_percent,disk_percent
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2018-05-15 17:00:00,21.875,22.7125,98.0
2018-05-15 17:00:00,21.875,22.7125,98.0
