## OpsGenie Clean Up Utils

Removing users from OpsGenie application involves many step and can be painfully slow. Recently I had to remove few hundreds of them. Let me share my solution.

GenieClient class and the notes **(scroll below)** can help to clean up Users, Teams, Integrations from OpsGenie.

### Scenarios and features
 - Dump user list
 - Scenario 1: Clean up users who is in CSV file
 - Scenario 2: Clean up users who is NOT in CSV file
 - Scenario 3: Clean up user one by one
 - Clean up disabled integrations
 - Clean up vacant teams


In [None]:
import requests
from urllib.parse import urlencode
import csv


class GenieClient():
    def __init__(self, key):
        self.url = 'https://api.opsgenie.com/v2/%s'
        self.headers = {
            'Content-Type': 'application/json',
            'Authorization': 'GenieKey ' + key,
            'Cache-control': 'no-cache'
        }

    def request(self, path, **kwargs):
        """
        Sends an HTTP GET request.
        Returns a yielded iteration. 
        example: next(request('users', sort='fullName', query='role:admin AND locale:en_US'))
        """
        url = self.url % path + ('?' + urlencode(kwargs) if len(kwargs) else '')
        last = False
        while not last:
            r = requests.get(url, headers=self.headers).json()
            yield r
            if 'paging' in r and 'next' in r['paging']:
                url = r['paging']['next']
            else:
                last = True

    def delete(self, path, **kwargs):
        """
        Sends an HTTP DELETE request.
        example: delete('users/' + username)
        """
        url = self.url % path + ('?' + urlencode(kwargs) if len(kwargs) else '')
        return requests.delete(url, headers=self.headers).json()

    def patch(self, path, payload, **kwargs):
        """
        Sends an HTTP PATCH request.
        example: patch('escalations/' + username, {'rules': rules})
        """
        url = self.url % path + ('?' + urlencode(kwargs) if len(kwargs) else '')
        return requests.patch(url, json=payload, headers=self.headers).json()

    def dump_users(self, filename, query=''):
        """
        Dumps user list to csv file
        example:
          filename='users.csv'
          query='role:user AND verified:1'
        """
        with open(filename, 'w', newline='') as csvfile:
            csvw = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            csvw.writerow(('name', 'email', 'role', 'verified'))
            for r in self.request('users', query=query):
                print('.')
                if 'data' in r:
                    for u in r['data']:
                        csvw.writerow((u['fullName'], u['username'], u['role']['name'], u['verified']))

    def for_existing_user(func):
        """
        Decorator method for exiting user
        """
        def inner(self, username):
            r = requests.get(self.url % 'users/' + username, headers=self.headers)
            if r.status_code == requests.codes.ok and 'data' in r.json():
                return func(self, username)
            else:
                print('[Error] User account {} not found'.format(username))
        return inner

    def remove_user_from_escalations(self, username):
        """
        Removes given user from its escalations. 
        Newly vacant escalations and routing-rules will be deleted
        """
        for user_esc in next(self.request(f'users/{username}/escalations'))['data']:
            escalation = next(self.request('escalations/' + user_esc['id']))['data']
            esc_rules = list(filter(lambda r: not(r['recipient']['type'] == 'user' and username == r['recipient']['username']), escalation['rules']))
            if len(esc_rules) == 0:
                if 'ownerTeam' in escalation:
                    for rule in next(self.request('/teams/' + escalation['ownerTeam']['id'] + '/routing-rules'))['data']:
                        if ('notify' in rule and rule['notify']['type'] == 'escalation' and rule['notify']['id'] == user_esc['id']):
                            try:
                                r = self.delete(
                                    'teams/' + escalation['ownerTeam']['id'] + '/routing-rules/' + rule['id'])
                                if 'result' not in r or r['result'] != 'Deleted':
                                    raise Exception(r['message'])
                                print('[ OK  ] Remove vacant routing-rules {} for {}'.format(rule['notify']['name'], username))
                            except Exception as e:
                                print('[Error] Remove vacant routing-rules {} for {}'.format(rule['notify']['name'], username))
                                print("\tException " + str(e))
                try:
                    r = self.delete('escalations/' + user_esc['id'])
                    if 'result' not in r or r['result'] != 'Deleted':
                        raise Exception(r['message'])
                    print('[ OK  ] Remove vacant escalation {} for {}'.format(user_esc['name'], username))
                except Exception as e:
                    print('[Error] Remove vacant escalation {} for {}'.format(user_esc['name'], username))
                    print("\tException " + str(e))
            else:
                try:
                    r = self.patch('escalations/' + user_esc['id'], {'rules': esc_rules})
                    if 'result' not in r or r['result'] != 'Updated':
                        raise Exception(r['message'])
                    print('[ OK  ] Remove {} from escalation {}'.format(username, user_esc['name']))
                except Exception as e:
                    print('[Error] Remove {} from escalation {}'.format(username, user_esc['name']))
                    print("\tException " + str(e))

    def remove_user_from_schedules(self, username):
        """
        Removes given user from its schedules. 
        Newly vacant rotations will be deleted.
        Vacant schedules can be removed by delete_empty_teams() method.
        """
        for schedule in next(self.request(f'users/{username}/schedules'))['data']:
            rotations = next(self.request('schedules/' + schedule['id'] + '/rotations'))['data']
            for kr, rotation in enumerate(rotations):
                participants = list(filter(lambda p: not('username' in p and p['username'] == username), rotation['participants']))
                if len(participants) == 0:
                    try:
                        r = self.delete('schedules/{}/rotations/{}'.format(schedule['id'], rotations[kr]['id']))
                        if 'result' not in r or r['result'] != 'Deleted':
                            raise Exception(r['message'])
                        print('[ OK  ] Remove {} and the vacant rotation {}/{}'.format(username, schedule['name'], rotation['name']))
                    except Exception as e:
                        print('[Error] Remove {} and the vacant rotation {}/{}'.format(username, schedule['name'], rotation['name']))
                        print("\tException " + str(e))
                else:
                    try:
                        r = self.patch('schedules/{}/rotations/{}'.format(schedule['id'], rotations[kr]['id']),
                                       {'participants': participants})
                        if 'result' not in r or r['result'] != 'Updated':
                            raise Exception(r['message'])
                        print('[ OK  ] Remove {} from rotation {}/{}'.format(username, schedule['name'], rotation['name']))
                    except Exception as e:
                        print('[Error] Remove {} from rotation {}/{}'.format(username, schedule['name'], rotation['name']))
                        print("\tException " + str(e))

    def remove_user_from_teams(self, username):
        """
        Removes given user from its schedules. 
        Newly vacant rotations will be deleted.
        Vacant schedules can be removed by delete_empty_teams() method.
        """
        for team in next(self.request(f'users/{username}/teams'))['data']:
            try:
                r = self.delete('teams/{}/members/{}'.format(team['id'], username))
                if 'result' not in r or r['result'] != 'Removed':
                    raise Exception(r['message'])
                print('[ OK  ] Remove {} from team {}'.format(username, team['name']))
            except Exception as e:
                print('[Error] Remove {} from team {}'.format(username, team['name']))
                print("\tException " + str(e))

    def delete_user(self, username):
        """
        Deletes user from the app. 
        """
        try:
            r = self.delete('users/' + username)
            if 'result' not in r or r['result'] != 'Deleted':
                raise Exception(r['message'])
            print('[ OK  ] Remove user account {}'.format(username))
        except Exception as e:
            print('[Error] Remove user account {}'.format(username))
            print("\tException " + str(e))

    @for_existing_user
    def deep_user_remove(self, username):
        """
        All in one user clean up. Removes given user from escalations, schedules, teams and from the application. 
        """        
        self.remove_user_from_escalations(username)
        self.remove_user_from_schedules(username)
        self.remove_user_from_teams(username)
        self.delete_user(username)
        
    def delete_empty_teams(self):
        """
        Deletes all vacant teams (teams without members).
        """        
        for teams in g.request('teams'):
            for team in teams['data']:
                info = next(g.request('teams/{}'.format(team['id'])))['data']
                if not 'members' in info:
                    try:
                        r = self.delete('teams/{}'.format(team['id']))
                        if 'result' not in r or r['result'] != 'Deleted':
                            raise Exception(r['message'])
                        print('[ OK  ] Deleted empty team {}'.format(team['name']))
                    except Exception as e:
                        print('[Error] Deleted empty team {}'.format(team['name']))
                        print("\tException " + str(e))
                else:
                    print('[Skipp] Team {} with {} member(s)'.format(team['name'], len(info['members'])))
        
    def delete_disabled_integrations(self):
        """
        Removes all disabled integrations. 
        """        
        for integrations in g.request('integrations'):
            for integration in integrations['data']:
                if not integration['enabled']:
                    try:
                        r = self.delete('integrations/{}'.format(integration['id']))
                        if 'result' not in r or r['result'] != 'Deleted':
                            raise Exception(r['message'])
                        print('[ OK  ] Deleted disabled integration {}'.format(integration['name']))
                    except Exception as e:
                        print('[Error] Deleted disabled integration {}'.format(integration['name']))
                        print("\tException " + str(e))
                else:
                    print('[Skipp] Enabled integration {}'.format(integration['name']))



## Instantiate the GenieClient class
As an imput the OpsGenie API Key need to be provided. You can find your API Key at https://app.opsgenie.com/integration page. All account has a *Default API (API)* out of the box. In case you would not like to mess up with the Default API, you can create a new API (Rest API HTTPS over JSON) and make sure it is not limited to read only access.  


In [None]:
g = GenieClient(input('OpsGenie API Key: '))

## Backup, backup, backup

I highly recommend to backup all data, before the operation. Either you may remove members unintentionally, either they will complain later, when they realise can't login anymore.

The most comprehensive available backup solution is here: https://github.com/opsgenie/opsgenie-configuration-backup


## Dump user list
Although you surly made a backup in the mentioned way you also can dump the current user-list into a CSV file.

In [None]:
# Read-only operation to dump users into a CSV file
g.dump_users('users-before-clanup.csv')

## Scenario 1: Clean up users who are in CSV file
In this scenario we only clean up users who are listed in CSV file.
The content of CSV is coma `','` delimitered file with at least one column called `'email'`. Example:
```
email,name
john,doe@example.com,John Doe
oliver,nadj@example.com,Oliver Nadj
```

In [None]:
# I used panda library for read CSV and manipulating data.
import pandas as pd
to_remove = pd.read_csv('users-to-remove.csv')['email'].tolist()
# double check first 20 users
to_remove[0:20]

In [None]:
# Take a deep breath and remove them
for u in to_remove:
    g.deep_user_remove(u)

## Scenario 2: Clean up users who are NOT in CSV file
In this scenario is a bit more clomplicated than the previous one. 
we only lean up users who are NOT listed in CSV file and protect who are listed.
The content of CSV is coma `','` delimitered file with at least one column called `'email'`. Example:
```
email,name
john,doe@example.com,John Doe
oliver,nadj@example.com,Oliver Nadj
```

In [None]:
# I used panda library for read CSV and manipulating data.
import pandas as pd

# Let's dump all users 1st
g.dump_users('all-users.csv')
allusers = pd.read_csv('all-users.csv')['email'].tolist()
allusers[0:6]

In [None]:
# Load the list of protected users (users who will remain, but anyone else will be removed)
to_protect = pd.read_csv('./protected-users.csv')['email'].tolist()
to_protect[0:3]

In [None]:
# Let's create a list of the users who will be removed
with open('users-to-remove.csv', 'w', newline='') as csvfile:
    csvw = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
    csvw.writerow(['email'])
    for u in allusers:
        if not(u in to_protect):
            csvw.writerow([u])

In [None]:
# Now we have a list of the users we wanted to clean up
to_remove = pd.read_csv('users-to-remove.csv')['email'].tolist()
to_remove[0:20]

In [None]:
# Take a deep breath and remove them
for u in to_remove:
    g.deep_user_remove(u)

## Scenario 3: Clean up user one by one
In case you would like to warm up, you can try with simple requests and clean user one by one

In [None]:
# Get user info
email = 'john.doe@example.com'
next(g.request('users/' + email)) # next() is needed to trigger yielded iteration.

In [None]:
# Get scheduels of the user
next(g.request('users/' + email + '/schedules'))

In [None]:
# Get temas of the user
next(g.request('users/' + email + '/teams'))

In [None]:
# Removes given user from escalations, schedules, teams and deletes the user from application
g.remove_user_from_escalations(email)
g.remove_user_from_schedules(email)
g.remove_user_from_teams(email)
g.delete_user(email)

## Clean up disabled integrations
This featue may not as handy as a previous one, but can be usefull if you have auto generated integrations.

In [None]:
# Deletes all disabled integrations
g.delete_disabled_integrations()

## Clean up vacant teams
It removes all vacant teams (teams with no members in it) and it's escalations

In [None]:
# Removes vacant teams.
g.delete_empty_teams()