# Offenses to csv file and monitoring for new ones
This is for get the offenses in Qradar and save it to a database for later use in data analysis software.

This is an example of how to use the IBM QRadar API. Educational purposes only. Please test it on your own risk. Do not test it in production environment.

Import the necessary modules

In [None]:
import json
import requests
import pandas as pd
import urllib.parse
import time
# from pandas.json_normalize import json_normalize
# this line depends on the version of pandas you have installed
import warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.filterwarnings("ignore")
import logging

Create a logger, it will log to a file and to the console. The file will be in the same directory as the script.

In [None]:
#create a log
logging.basicConfig(filename='get_offenses.log', 
                    level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

You need to change the values of the variables below for your environment, optional you can put a filter in te suffix like "?filter=status%3Dopen" if you have too much offenses

In [None]:
#authentication
SEC_TOKEN = 'xxxxxxxxxxxxxxxxxxxxx'
#token name in Qradar is "Inyector"
URL_base = 'https://xxx.xxx.xxx.xxx/api'
URL_suffix='/siem/offenses?filter=status%3Dopen'
URL_Concat = URL_base + URL_suffix

In order to get the data, we need to create a header with the security token

In [None]:
#headers
header = {
    'SEC':SEC_TOKEN,
    'Content-Type':'application/json',
    'accept':'application/json'
}

This function will get all the offenses in Qradar

In [None]:
#def get all the offenses in Qradar
def get_old_offenses():
    response = requests.get(URL_Concat, headers=header, verify=False)
    if response.status_code == 200:
        offenses=json.loads(requests.get(URL_Concat, headers=header, verify=False).text)
        df=pd.json_normalize(offenses)
        logging.info('Function get_old_offenses() is done ok')
        return df
    else:
        logging.error('Function get_old_offenses() is not done ok')
        return None

def drop the columns that are not needed

In [None]:
def drop_columns(df):
        df=df.drop(['last_persisted_time',
            'username_count',
            'flow_count',
            'assigned_to',
            'follow_up',
            'source_address_ids',
            'protected',
            'close_time',
            'last_updated_time',
            'policy_category_count', 
            'closing_reason_id',
            'first_persisted_time',
            'local_destination_address_ids',
            'local_destination_count', 
            'rules',
            'log_sources',
            'security_category_count',
            'closing_user',
            'category_count',
            'device_count',
            'status'
            ], axis=1)
        logging.info('Function drop_columns() is done ok')
        return df

Call it

In [None]:
df=get_old_offenses()

The json response does not have all the information I need ot it conmes in nested jsons so I have to create a new columns for all the information I need

In [None]:
#create a new colum for log source, rule id, domain name and offense type name
df['log_source']=''
df['rule_id']=''
df['domain_name']=''
df['offense_type_name']=''
logging.info('Create a new column for log source, rule id and domain name')

The log sources and rule info asociated with the offense are in the column log_sources and its a list

In [None]:
#def extract nested json in the df (log source and rules)
def extract_nested_jsons(df):
    for i in range(len(df)):
        if df['log_sources'][i] != []:
            df['log_source'][i]=df['log_sources'][i][0]['name']
        logging.info('Extract the log source in the nested json for the offense id: %s', df.iloc[i]['id'])
        if df['rules'][i] != []:
            df['rule_id'][i]=df['rules'][i][0]['id']
        logging.info('Extract the rule id in the nested json for the offense id: %s', df.iloc[i]['id'])
    logging.info('Function extract_nested_jsons() is done ok')
    return df

The domain ID comes in the offense json, but the domain name is not, so we need to change the domain id for the domain name

In [None]:
#def change the domain id for the domain name
def change_domain_id(domain_id):
    URL_Domanins= URL_base+'/config/domain_management/domains/'+ str(domain_id)
    response=requests.get(URL_Domanins, headers=header, verify=False)
    if response.status_code == 200:
        domain_name=json.loads(requests.get(URL_Domanins, headers=header, verify=False).text)
        domain_name=domain_name['name']
        logging.info('Function change_domain_id() is done ok')
        return domain_name
    else:
        logging.error('Function change_domain_id() is not done ok')
        return None

The indexer is the offense type id, but I want to change it for the offense type name

In [None]:
#def change the offense type id for the offense type name
def change_offense_type_id(offense_type_id):
    URL_Offense_Type= URL_base+'/siem/offense_types/'+ str(offense_type_id)
    response=requests.get(URL_Offense_Type, headers=header, verify=False)
    if response.status_code == 200:
        offense_type_name=json.loads(requests.get(URL_Offense_Type, headers=header, verify=False).text)
        offense_type_name=offense_type_name['name']
        logging.info('Function change_offense_type_id() is done ok')
        return offense_type_name
    else:
        logging.error('Function change_offense_type_id() is not done ok')
        return None

This function get the rule details that I want to use

In [None]:
#def change the rule id for the rule name an get the owner of the rule
def change_rule_id(rule_id):
    URL_Rules= URL_base+'/analytics/rules/'+ str(rule_id)
    response=requests.get(URL_Rules, headers=header, verify=False)
    if response.status_code == 200:
        rule_details=json.loads(requests.get(URL_Rules, headers=header, verify=False).text)
        rule_name=rule_details['name']
        rule_origin=rule_details['origin']
        rule_owner=rule_details['owner']
        rule_identifier=rule_details['identifier']
        logging.info('Function change_rule_id() is done ok for rule '+rule_name)
        return rule_name , rule_owner, rule_origin, rule_identifier
    else:
        logging.error('Function change_rule_id() is not done ok')
        return None

Call it

In [None]:
df=extract_nested_jsons(df)
df=drop_columns(df)

Two for cicle to call all the previous functions

In [None]:
logging.info('Starting to get the domain name for the domain id in all the ofenses')
for i in range (0,len(df)):
    logging.info('Getting the domain name for the offense id: %s', df.iloc[i]['id'])
    domain_id=df.loc[i,'domain_id']
    domain_name=change_domain_id(domain_id)
    df.loc[i,'domain_name']=domain_name
    logging.info('Domain name for the offense id: %s is: %s', df.iloc[i]['id'], domain_name)
    logging.info('Getting the offence type name for the offense id: %s', df.iloc[i]['id'])
    offense_type=df.iloc[i]['offense_type']
    offense_type_name=change_offense_type_id(offense_type)
    df.loc[i,'offense_type_name']=offense_type_name
    logging.info('Offense type name for the offense id: %s is %s', df.iloc[i]['id'], offense_type_name)
logging.info('Change the domain id for the domain name is for all the current offenses is done ok')

In [None]:
#for cicle to get the rule name and owner
logging.info('Starting to get the rule name, owner, origin and identifier for the rule id in all the ofenses')
for i in range (0,len(df)):
    rule_id=df.loc[i,'rule_id']
    rule_name, rule_owner, rule_origin, rule_identifier =change_rule_id(rule_id)
    df.loc[i,'rule_name']=rule_name
    df.loc[i,'owner']=rule_owner
    df.loc[i,'origin']=rule_origin
    df.loc[i,'identifier']=rule_identifier
    logging.info('Rule name, owner, origin and identifier for the offense id: %s is %s', df.iloc[i]['id'], rule_name)
logging.info('Change the rule id for the rule name, owner, origin and identifier is for all the current offenses is done ok')

In [None]:
URL_base = 'https://xxx.xxx.xxx.xxx/api'
URL_suffix='/siem/offenses'
URL_Concat = URL_base + URL_suffix

Saved the dataframe in a csv file

In [None]:
#df to csv
df.to_csv('offenses.csv', index=False)
logging.info('The df to csv is done ok')

Get the last ID

In [None]:
last_id=df['id'].max()

This function is to get the last id of the offenses if there is a new one, latter applys all the previous functions

In [None]:
def get_new_offenses(last_id):
    last_id=last_id+1
    URL_Concat = URL_base + URL_suffix + '/' + str(last_id)
    response=requests.get(URL_Concat, headers=header, verify=False)
    if response.status_code==200:
        new_offense=json.loads(requests.get(URL_Concat, headers=header, verify=False).text)
        new_offense_df=pd.json_normalize(new_offense)
        new_offense_df['log_source']=''
        new_offense_df['rule_id']=''
        new_offense_df['domain_name']=''
        new_offense_df['ofense_type_name']=''
        new_offense_df=extract_nested_jsons(new_offense_df)
        new_offense_df=drop_columns(new_offense_df)
        for i in range (0,len(new_offense_df)):
            logging.info('Getting the domain name for the offense id: %s', new_offense_df.iloc[i]['id'])
            domain_id=new_offense_df.loc[i,'domain_id']
            domain_name=change_domain_id(domain_id)
            new_offense_df.loc[i,'domain_name']=domain_name
            logging.info('Domain name for the offense id: %s is: %s', new_offense_df.iloc[i]['id'], domain_name)
            logging.info('Getting the offense type name for the offense id: %s', new_offense_df.iloc[i]['id'])
            offense_type=change_offense_type_id(new_offense_df.loc[i,'offense_type'])
            offense_type_name=change_offense_type_id(offense_type)
            new_offense_df.loc[i,'ofense_type_name']=offense_type_name
            logging.info('Offense type name for the offense id: %s is %s', new_offense_df.iloc[i]['id'], offense_type_name)
        for i in range (0,len(new_offense_df)):
            rule_id=new_offense_df.loc[i,'rule_id']
            rule_name, rule_owner, rule_origin, rule_identifier =change_rule_id(rule_id)
            new_offense_df.loc[i,'rule_name']=rule_name
            new_offense_df.loc[i,'owner']=rule_owner
            new_offense_df.loc[i,'origin']=rule_origin
            new_offense_df.loc[i,'identifier']=rule_identifier
            logging.info('Rule name, owner, origin and identifier for the new offense id: %s is %s', new_offense_df.iloc[i]['id'], rule_name)
        return new_offense_df
    else:
        logging.error('Function get_new_offenses() is not done ok')
        return None

Watch for new offenses, the time is configurable

In [None]:
while True:
    new_offense_df=get_new_offenses(last_id)
    if new_offense_df is not None:
        logging.info('Function get_new_offenses() is done ok')
        logging.info('last id is %s', last_id)
        last_id=new_offense_df.iloc[0]['id']
        #append to csv
        new_offense_df.to_csv('offenses.csv', mode='a', header=False, index=False)
        logging.info('new offense added. ID is %s', last_id)
        logging.info('The new offenses to csv is done ok')
    else:
        logging.info('No new offenses, next offense id will be %s', last_id+1)
        logging.info('Sleeping for 5 minutes')
    time.sleep(300)
    logging.info('Sleeping for 5 minutes')