### Initialise Environment

In [None]:
import pandas as pd
import glob
import os
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
from pathlib import Path

sns.set(style="whitegrid")
#set folder path
root_dir = Path(os.getcwd()).parent
folder_path = root_dir/"O365_temp"
file_list = glob.glob(os.path.join(folder_path, "*.csv"))


df_list = []

for file in file_list:
    # Determine signin type from filename
    filename = os.path.basename(file)
    if filename.startswith("Interactive"):
        signin_type = "Interactive"
    elif filename.startswith("NonInteractive"):
        signin_type = "NonInteractive"
    else:
        signin_type = "Unknown"  # fallback if name doesn't match

    # Read CSV
    temp_df = pd.read_csv(file, low_memory=False)

    # Add signin_type column
    temp_df['signin_type'] = signin_type

    # Append to list
    df_list.append(temp_df)

# Merge all into one DataFrame
df = pd.concat(df_list, ignore_index=True)

#convert timestamp column
df['Date (UTC)'] = pd.to_datetime(df['Date (UTC)'])

#extract login country
df['country'] = df['Location'].str.strip().str.split(',').str[-1].str.strip()

# List of sensitive users
sensitive_users = [
   # 'sensitive username no.1', sensitive username no.2', 'sensitive username no.3'
]

# List of known safe IP addresses to ignore
safe_ips = [
    # "known IP 1", "known IP 2", "known IP 3"
]

#show the earliest and latest dates
print("Earliest date:", df['Date (UTC)'].min())
print("Latest date:", df['Date (UTC)'].max())   

#show the number of days in the dataset
print("Number of days in dataset:", (df['Date (UTC)'].max() - df['Date (UTC)'].min()).days + 1)


# Extract the country from the Location field and add a new column called country containing the country code
import pycountry_convert as pc

def get_continent(country_code):
    try:
        continent_code = pc.country_alpha2_to_continent_code(country_code)
        return continent_code
    except:
        return 'Unknown'

df['continent'] = df['country'].apply(get_continent)


# Filter for Successful Interactive Logins at Suspicious Time of Day

In [None]:
#check succesful logins after hours
df['hour'] = df['Date (UTC)'].dt.hour
after_hours_success = df[
    ((df['hour']<3) | (df['hour']>21)) &
    (df['Status'].str.lower() == 'success') &
    (df['signin_type'] == 'Interactive') 
]

print(after_hours_success['Date (UTC)'].count(),'\033[1m' + 'Successful interactive Logins after hours  (between 23:00 and 04:00)' + '\033[0m')
print('')
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
  print(after_hours_success[['User', 'Date (UTC)', 'country', 'signin_type' ]])



## Successful after-hours logins by sensitive users

In [None]:
sensitive_after_hours_success = after_hours_success[
    after_hours_success['User'].isin(sensitive_users)
]   

print(sensitive_after_hours_success['Date (UTC)'].count(),'\033[1m' + 'Successful After Hours Interactive Logins for Sensitive Users' + '\033[0m')

with pd.option_context('display.max_rows', None, 'display.max_columns', None):
  print(sensitive_after_hours_success[['User', 'Date (UTC)', 'country', 'IP address', 'signin_type']].drop_duplicates().sort_values(by='User'))  


# Successful Overseas Logins and Highlight Non-European Countries

In [None]:
## Successful overseas logins 

overseas_success = df[
    (df['Status'].str.lower() == 'success') &
     (df['country'] != 'MT') & # country is not Malta
     (df['country'] != '')
      ]
non_europe_logins = overseas_success[overseas_success['continent'] != 'EU']
print('\033[1m' + 'Successful Overseas Login - Countries' + '\033[0m')
print(overseas_success['country'].unique())
print('')
print('')
print('\033[1m' + 'Attention - Non-European Countries' + '\033[0m')
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
  print(non_europe_logins[['User', 'country', 'signin_type']].drop_duplicates())

# print unique combination of user and country
print('')
print('\033[1m' + 'Unique User and Country Combinations' + '\033[0m')
unique_user_country = overseas_success[['User', 'country','signin_type']].drop_duplicates().sort_values(by='User')
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
  print(unique_user_country.sort_values(by='User'))   

## List Strange hours logins by country

In [None]:
## Succesful out of hours logins by country
print('\033[1m' + 'Successful Login After Hours - Countries' + '\033[0m')
print(after_hours_success['country'].unique())

non_europe_logins = after_hours_success[after_hours_success['continent'] != 'EU']
print('')
print('\033[1m' + 'Attention - Non-European Countries' + '\033[0m')
print(non_europe_logins['country'].unique())

## List Overseas Logins by Sensitive Users

In [None]:
sensitive_overseas_success = overseas_success[
    overseas_success['User'].isin(sensitive_users)
]   

print(sensitive_overseas_success['Date (UTC)'].count(),'\033[1m' + 'Successful After Hours Logins for Sensitive Users' + '\033[0m')

with pd.option_context('display.max_rows', None, 'display.max_columns', None):
  print(sensitive_overseas_success[['User', 'Date (UTC)', 'country', 'IP address', 'signin_type']].drop_duplicates().sort_values(by='User'))  


# List Failed Interactive Logins by user

In [None]:
##Failed Logins by User
failed_interactive_logins = df[
    (df['Status'].str.lower() == 'failure') &
    (df['signin_type'] == 'Interactive')
]
failed_counts = failed_interactive_logins.groupby('User').size().sort_values(ascending=False)
print('\033[1m' + 'Failed Interactive Logins by User' + '\033[0m')
print('')
print(failed_counts.head(20))

In [None]:
failed_counts_top = failed_interactive_logins['User'].value_counts().head(10)

plt.figure(figsize=(10, 5))
sns.barplot(x=failed_counts_top.values, y=failed_counts_top.index, hue=failed_counts_top.index, palette="Oranges_r",legend=False)
plt.title("Top 10 Users by Failed Logins")
plt.xlabel("Failed Attempts")
plt.ylabel("User")
plt.tight_layout()
plt.show()

## Top 10 Countries for Failed Logins

In [None]:
failed_logins = df[(df['Status'].str.lower() == 'failure')]

country_counts = failed_logins['country'].value_counts().head(10)

plt.figure(figsize=(6, 6))
country_counts.plot.pie(autopct='%1.1f%%', startangle=90, cmap='Reds')
plt.ylabel("")
plt.title("Top 10 Countries for Failed Logins")
plt.tight_layout()
plt.show()


# List Suspicious IPs (IPs with multiple failed attempts)

In [None]:
## SUSPICIOUS IPs ##


# Filter only failed login attempts
failed_logins = df[df['Status'].str.lower() == 'failure']

# Exclude safe IPs
failed_logins = failed_logins[~failed_logins['IP address'].isin(safe_ips)]

# Group by IP address and User, then count the number of failed attempts
failed_attempts_summary = (
    failed_logins
    .groupby(['IP address', 'User', 'country'])
    .size()
    .reset_index(name='Failed Attempts')
    .sort_values(by='Failed Attempts', ascending=False)
)

# Show only entries with more than a threshold (e.g., more than 3 failed attempts)
suspicious_ips = failed_attempts_summary[failed_attempts_summary['Failed Attempts'] > 2]

# Display the result
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    print(suspicious_ips)

## IPs with most failed login attempts

In [None]:
# Group by IP address, count unique users and sum failed attempts
ip_summary = (
    suspicious_ips
    .groupby('IP address')
    .agg(
        num_users=('User', 'nunique'),
        total_failed_attempts=('Failed Attempts', 'sum'),
        countries=('country', lambda x: ', '.join(x.unique()))
    )
    .reset_index()
    .sort_values(by='total_failed_attempts', ascending=False)
)

print(ip_summary.head(20))

# Check for Impossible Travel Scenarios

In [None]:
# Sort by user and timestamp
df_success = df[df['Status'].str.lower() == 'success']

df_sorted = df_success.sort_values(by=['User', 'Date (UTC)'])


# Calculate time difference and check country changes
df_sorted['prev_country'] = df_sorted.groupby('User')['country'].shift(1)
df_sorted['time_diff'] = df_sorted.groupby('User')['Date (UTC)'].diff().dt.total_seconds() / 3600  # hours

# Flag suspicious rapid location changes
impossible_travel = df_sorted[
    (df_sorted['country'] != df_sorted['prev_country']) & 
    (df_sorted['time_diff'] < 1)  & # less than 1 hour apart 
    (df_sorted['User'] != "AIM") # Exclude AIM user
]
print('\033[1m' + 'Impossible Travel' + '\033[0m')
print('')
#with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(impossible_travel[['User', 'Date (UTC)', 'country', 'prev_country' , 'signin_type']])
print('')
print(impossible_travel['User'].unique())
print('')


# Users with High Succesful Login Counts

In [None]:
login_counts = df[df['Status'].str.lower() == 'success'].groupby('User').size().sort_values(ascending=False)
print('\033[1m' + 'Successful Logins' + '\033[0m')
print('')
print(login_counts.sort_values(ascending=False).head(20))
print('')

#show logins where signin type is interactive and status is success
interactive_logins = df[df['signin_type'] == 'Interactive']
interactive_logins_success = interactive_logins[interactive_logins['Status'].str.lower() == 'success']
print('\033[1m' + 'Successful Interactive Logins' + '\033[0m') 
print('')
print(interactive_logins_success['User'].value_counts().head(20))
print('')


In [None]:
#plot succesful interactive logins by user for the top 10 users
top_users = interactive_logins_success['User'].value_counts().head(20)
plt.figure(figsize=(10, 5))
sns.barplot(x=top_users.values, y=top_users.index, hue=top_users.index, palette="Blues_r", legend=False)
plt.title("Top 10 Users by Successful Interactive Logins")
plt.xlabel("Successful Logins")
plt.ylabel("User")
plt.tight_layout()
plt.show()


# Logins from Third Party Domains

In [None]:
#filter for external domains wher domain is not @contoso.com
external_success = df[
    (df['Status'].str.lower() == 'success') &
    (~df['Username'].str.contains('@contoso.com', na=False))  # Exclude internal domain
]



# Group successful external logins by Username, Location, and IP address
external_success_grouped = external_success.groupby(['Username', 'country', 'IP address']).size().reset_index(name='Count')

print('\033[1m' + 'Grouped Successful Logins with External Domains' + '\033[0m')
print(external_success_grouped)

print('')
print(external_success['Username'].unique())

# Alert on Succesful SMTP Logins (possible MFA bypass)

In [None]:
#Check for succesfful SMTP logins
smtp_success = df[
    (df['Status'].str.lower() == 'success') &
    (df['Client app'].str.contains('SMTP'))
]
print('\033[1m' + 'Successful SMTP Logins' + '\033[0m')
print('')
print(smtp_success[['User', 'Date (UTC)', 'IP address', 'country']].drop_duplicates().sort_values(by='Date (UTC)'))