<b><font size="5">This Notebook provides the code for the labeling the datasets</font></b>

In [None]:
import pandas as pd
import numpy as np
import datetime
from datetime import timedelta

In [None]:
# Load the data into pandas dataframe

df_audit = pd.read_csv('Datasets/AuditCombinedCleaned.csv')
df_auth = pd.read_csv('Datasets/AuthCombinedCleaned.csv')

# Define list of error codes 
error_codes = [50053, 50126, 50055]

# Make sure that the "createdDateTime" is in datetime format
df_auth['createdDateTime'] = pd.to_datetime(df_auth['createdDateTime'])

<b>Baseline Labeling the sign-in dataset</b>

In [None]:
'''
This function check whether there are more than 5 failed authentication attempts with the codes 50053, 50126 or 50055
from the same IP address in a time window of 20 minutes 
(similar to https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Microsoft%20Entra%20ID/Analytic%20Rules/SigninPasswordSpray.yaml)
'''
def detect_password_spray(df_auth):
    
    # Define constants
    authentication_window = pd.Timedelta(minutes=20)
    authentication_threshold = 5
    look_back = pd.Timedelta(days=3)
    
    # Convert to datetime
    df_auth['createdDateTime'] = pd.to_datetime(df_auth['createdDateTime'])
    df_auth['Label_Password_Spray'] = 'Normal'
    
    earliest_time = df_auth['createdDateTime'].min().floor('1D')
    latest_time = df_auth['createdDateTime'].max()
    time_range = pd.date_range(start=earliest_time, end=latest_time, freq='D')

    for day in time_range:
        current_day = day
        end_day = day + pd.Timedelta(days=1)

        # Current day events
        daily_events = df_auth[(df_auth['createdDateTime'] >= current_day) & (df_auth['createdDateTime'] < end_day)].copy()
        
        # Create time windows of 20 minutes and group the events on the window and IP
        daily_events['Window'] = daily_events['createdDateTime'].dt.floor('20min')
        daily_grouped_failures = daily_events[daily_events['status.errorCode'].isin(error_codes)].groupby(['Window', 'callerIpAddress']).userPrincipalName.nunique().reset_index(name='failCount')

        # Identify IPs that exceed the threshold within the 20 minute window
        breached_ips = daily_grouped_failures[daily_grouped_failures['failCount'] >= authentication_threshold]

        if not breached_ips.empty:
            breached_ip_list = breached_ips['callerIpAddress'].unique().tolist()
            print(breached_ip_list)
            for breached_ip in breached_ip_list:
                # Use the previous 3 days as a lookback period
                lookback_start = current_day - look_back
                lookback_end = end_day
                
                # Filter 
                lookback_df = df_auth[(df_auth['createdDateTime'] >= lookback_start) & 
                                (df_auth['createdDateTime'] < lookback_end) & 
                                (df_auth['callerIpAddress'] == breached_ip)]
                failed_count = lookback_df[lookback_df['status.errorCode'].isin(error_codes)].shape[0]
                success_count = lookback_df[~lookback_df['status.errorCode'].isin(error_codes)].shape[0]

                if failed_count > success_count:
                    df_auth.loc[(df_auth['createdDateTime'] >= lookback_start) &
                           (df_auth['createdDateTime'] < lookback_end) & 
                           (df_auth['callerIpAddress'] == breached_ip), 'Label_Password_Spray'] = 'Password Spray'

                    # Label any successful logins in the 20-minute window where the breach was found
                    breach_time_blocks = breached_ips[breached_ips['callerIpAddress'] == breached_ip]['Window']
                    for block in breach_time_blocks:
                        block_start = block
                        block_end = block + authentication_window
                        df_auth.loc[(df_auth['createdDateTime'] >= block_start) & 
                               (df_auth['createdDateTime'] < block_end) & 
                               (df_auth['callerIpAddress'] == breached_ip), 'Label_Password_Spray'] = 'Password Spray'
                else:
                    df_auth.loc[(df_auth['createdDateTime'] >= lookback_start) & 
                           (df_auth['createdDateTime'] < lookback_end) & 
                           (df_auth['callerIpAddress'] == breached_ip), 'Label_Password_Spray'] = 'Normal'

    return df_auth


In [None]:
'''
This function check whether a sign-in burst from multiple locations and from the same account happened in a 1 hour time window.
In comaparison with the original query, the code checks for bursts with sign-in failed attempts (i.e. code is 50053, 50126, or 50055)
(similar to https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Microsoft%20Entra%20ID/Analytic%20Rules/Sign-in%20Burst%20from%20Multiple%20Locations.yaml)
'''

def detect_signin_burst(df_auth):
    
    # Instead of monitoring only one app, monitor all the apps identified in the 'appDisplayName' column
    list_apps = set(df_auth['appDisplayName'])
    
    # Discretize the 'createdDateTime' column to 1-hour bins
    df_auth['Window'] = pd.to_datetime(df_auth['createdDateTime']).dt.floor('1H')
    
    # Initializing the "Label_Burst_1_hour"
    df_auth['Label_Burst_1_hour'] = 'Normal'
    
    # Filtering failure codes and relevant apps
    failures_df = df_auth[df_auth['status.errorCode'].isin(error_codes) & df_auth['appDisplayName'].isin(list_apps)]

    # Group by 'userPrincipalName', 'appDisplayName' and 'Window'
    grouping_list = ['userPrincipalName', 'Window', 'appDisplayName']
    grouping = failures_df.groupby(grouping_list)
    locations_count_group = grouping['location.countryOrRegion'].nunique().reset_index(name='Unique_Locations')
    
    # Filter events in the grouping with more than one location
    suspicious_events = locations_count_group[locations_count_group['Unique_Locations'] > 1]
    
    # Updating the "Label_Burst_1_hour" by merging with information from 
    if not suspicious_events.empty:
        df_auth = pd.merge(df_auth, suspicious_events[grouping_list], 
                      on=grouping_list, 
                      how='left', indicator=True)
        
        df_auth.loc[df_auth['_merge'] == 'both', 'Label_Burst_1_hour'] = 'Signin Burst'
        df_auth.drop(columns=['_merge'], inplace=True)
    # Drop the 'Window' column
    df_auth.drop(columns=['Window'], inplace=True)
    
    return df_auth


In [None]:
'''
Detect password cracking attempts in a day using a threshold of more than 3 different location used to attack and account 
and more than 30 failed sign-ins. 
(similar to https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Microsoft%20Entra%20ID/Analytic%20Rules/DistribPassCrackAttempt.yaml)
'''

def detect_password_cracking(df_auth):

    # Threshold number of sign-ins to monitor
    signin_threshold = 30  
    # Threshold number of locations to monitor
    location_threshold = 3  
    
    # Create a new column for labels
    df_auth['Label_Password_Cracking'] = 'Normal'
    
    # Discretize the 'createdDateTime'
    df_auth['Day'] = df_auth['createdDateTime'].dt.floor('1D')
    unique_days = df_auth['Day'].unique()

    for day in unique_days:
        # Include only the data for one day
        daily_df = df_auth[df_auth['Day'] == day]

        # Group by user and aggregate relevant metrics
        grouping = daily_df[daily_df['status.errorCode'].isin(error_codes)].groupby('userPrincipalName').agg({
            'location.countryOrRegion': 'nunique',
            'status.errorCode': 'count'
        }).reset_index()
        grouping.columns = ['userPrincipalName', 'Location_Count', 'SigninCount']

        # Identify unexpected behavior from users using the thresholds
        suspicious_users = grouping[(grouping['SigninCount'] > signin_threshold) & 
                                    (grouping['Location_Count'] >= location_threshold)]
        if not suspicious_users.empty:
            for index, row in suspicious_users.iterrows():
                user = row['userPrincipalName']
                df_auth.loc[(df_auth['Day'] == day) & 
                            (df_auth['userPrincipalName'] == user) & 
                            (df_auth['status.errorCode'].isin(error_codes)), 'Label_Password_Cracking'] = 'Password Cracking'
    
    # Drop the 'Day' columns
    df_auth.drop(columns=['Day'], inplace=True)

    return df_auth

<b>Baseline Labeling the audit dataset</b>

In [None]:
'''
Monitor the audit logs for applications requesting consent for a scope of permissions that includes "RoleManagement.ReadWrite.Directory"
(similar to https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Microsoft%20Entra%20ID/Analytic%20Rules/AzureADRoleManagementPermissionGrant.yaml)
'''

def detect_role_management(df_audit):

    df_audit['Label_Role_Management'] = "Normal"
    
    # Monitor the activities containing the approle and delegated permission scopes
    df_audit_aux = df_audit[df_audit['Activity'].isin(["Add delegated permission grant", 
                                                       "Add app role assignment to service principal"])]
    
    # Check the scope of the app for the permission "RoleManagement.ReadWrite.Directory" (both approle and delegated)
    df_audit_aux = df_audit_aux[
        (df_audit_aux['Target1ModifiedProperty2NewValue'].str.contains("RoleManagement.ReadWrite.Directory")) |
        (df_audit_aux['Target1ModifiedProperty1NewValue'].str.contains("RoleManagement.ReadWrite.Directory"))
    ]

    # Label rows accordingly in the main dataframe
    df_audit.loc[df_audit_aux.index, 'Label_Role_Management'] = "Malicious_consent"

    # Propagate label to all rows with same correlationId
    correlation_ids = df_audit_aux['CorrelationId'].unique()
    df_audit.loc[df['CorrelationId'].isin(correlation_ids), 'Label_Role_Management'] = "Malicious_consent"

    return df_audit

In [None]:
'''
Monitor the audit logs for evidence of illicit consent grant that use the same permission scope as "o365-attack-toolkit" 
(o365-attack-toolkit: https://github.com/mdsecactivebreach/o365-attack-toolkit)
(similar to https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Microsoft%20Entra%20ID/Analytic%20Rules/MaliciousOAuthApp_O365AttackToolkit.yaml)
'''
def detect_role_management(df_audit):
    
    permissions_o365 = ["Contacts.Read", "User.Read", "Mail.Read", "Notes.Read.All", 
                        "MmailboxSettings.ReadWrite", "Files.ReadWrite.All", "Mail.Send", 
                        "Files.Read", "Files.Read.All"]
    legitimate_apps = ["DocuSign NA1", "Zoom Video Communications", "Zoom OneDrive", "Windows Defender Security Intelligence", 
                       "Amazon Alexa Connect", "Barracuda Networks", "Favro", "ClassFlow Prod OneDrive Personal"]
    
    # The least number of risky permissions that in the scope of an app, in order to be considered legitimate
    threshold = 3
    
    df_audit['Label_o365'] = 'Normal'
    df_audit_aux = df_audit[df_audit['Activity'] == 'Consent to application']
    df_audit_aux = df_audit_aux[~df_audit_aux['Target1DisplayName'].isin(legitimate_apps)]

    # Identify relevant "Add delegated permission grant" activities
    consent_flow = df_audit[df_audit['CorrelationId'].isin(df_audit_aux['CorrelationId'].unique())]
    deleg_permissions = consent_flow[consent_flow['Activity'] == 'Add delegated permission grant']

    # The consent type is not "AllPrincipals" and RiskyPermissions in scope > threshold
    for index, row in deleg_permissions.iterrows():
        if row['Target1ModifiedProperty2NewValue'] != "AllPrincipals":
            permissions = row['Target1ModifiedProperty1NewValue'].split(' ')
            matching_perms = [permission for permission in permissions if permission in permissions_o365]
            if len(matching_perms) > threshold:
                df_audit.loc[df_audit['CorrelationId'] == row['CorrelationId'], 'Label_o365'] = 'Malicious_consent'

    # Identify "Add service principal" activities within 14 days of labeled "Consent to application"
    service_principal_df = df_audit[df_audit['Activity'] == 'Add service principal']
    labeled_consent = df_audit[df_audit['Label_o365'] == 'Malicious_consent']
    for index, row in service_principal_df.iterrows():
        time_range = pd.date_range(end=row['Date (UTC)'], periods=14, freq='D')
        related = labeled_consent[labeled_consent['Date (UTC)'].isin(time_range)]
        if not related.empty:
            df_audit.loc[index, 'Label_o365'] = 'Malicious_consent'

    return df_audit
