In [1]:
# Data Preparation
# --------------------------------
import pandas as pd

# Load the dataset
file_path = 'C:/Users/e16011413/OneDrive - Ulster University/Desktop/UCAmI/UCAmI Anomaly/Data/kasterenActData.xlsx'
data = pd.read_excel(file_path)

# Convert start and end times to datetime objects
data['Start time'] = pd.to_datetime(data['Start time'])
data['End time'] = pd.to_datetime(data['End time'])

# Calculate the duration in minutes
data['Duration'] = (data['End time'] - data['Start time']).dt.total_seconds() / 60

# Display the first few rows of the dataset
data.head()

Unnamed: 0,Start time,End time,ID,Duration
0,2008-02-25 00:22:46,2008-02-25 09:34:12,10,551.433333
1,2008-02-25 09:37:17,2008-02-25 09:38:02,4,0.75
2,2008-02-25 09:49:23,2008-02-25 09:53:28,13,4.083333
3,2008-02-25 10:02:28,2008-02-25 10:12:42,5,10.233333
4,2008-02-25 10:19:06,2008-02-25 16:55:38,1,396.533333


In [2]:
# Transition Matrix Calculation
# ----------------------------------
from collections import defaultdict
import numpy as np

# Extract the activities and compute transitions
activities = data['ID']
transitions = defaultdict(lambda: defaultdict(int))

# Count transitions
for (a1, a2) in zip(activities[:-1], activities[1:]):
    transitions[a1][a2] += 1

# Compute probabilities
transition_probabilities = {}
for a1, a2_dict in transitions.items():
    total = float(sum(a2_dict.values()))
    transition_probabilities[a1] = {a2: count / total for a2, count in a2_dict.items()}

# Display the transition probabilities
transition_probabilities

{10: {4: 0.9583333333333334, 13: 0.041666666666666664},
 4: {13: 0.15789473684210525,
  17: 0.06140350877192982,
  15: 0.05263157894736842,
  10: 0.19298245614035087,
  4: 0.3508771929824561,
  5: 0.08771929824561403,
  1: 0.09649122807017543},
 13: {5: 0.55, 4: 0.4, 17: 0.05},
 5: {1: 0.9130434782608695, 15: 0.043478260869565216, 4: 0.043478260869565216},
 1: {4: 0.696969696969697,
  15: 0.09090909090909091,
  1: 0.030303030303030304,
  17: 0.09090909090909091,
  10: 0.030303030303030304,
  5: 0.06060606060606061},
 17: {4: 0.8, 17: 0.15, 13: 0.05},
 15: {17: 0.6, 1: 0.1, 4: 0.3}}

In [5]:
from collections import defaultdict
activities = ['A', 'B', 'A', 'A', 'C', 'D', 'A', 'A']
transitions = defaultdict(lambda: defaultdict(int))

# Count transitions
for (a1, a2) in zip(activities[:-1], activities[1:]):
    transitions[a1][a2] += 1

# Compute probabilities
transition_probabilities = {}
for a1, a2_dict in transitions.items():
    total = float(sum(a2_dict.values()))
    transition_probabilities[a1] = {a2: count / total for a2, count in a2_dict.items()}

# Display the transition probabilities
transition_probabilities


{'A': {'B': 0.25, 'A': 0.5, 'C': 0.25},
 'B': {'A': 1.0},
 'C': {'D': 1.0},
 'D': {'A': 1.0}}

In [3]:
# First Activity Determination (This helps us to know by which activities we better begin the synthetic days)
# ----------------------------------
# Find unique days and their first activities
data['Date'] = pd.to_datetime(data['Start time']).dt.date
first_activities = data.groupby('Date').first()['ID']

# Count the occurrences of each first activity
first_activity_counts = first_activities.value_counts()
first_activity_probabilities = first_activity_counts / len(first_activities)

# Display the probabilities for the first activity
first_activity_probabilities

ID
4     0.875000
10    0.083333
17    0.041667
Name: count, dtype: float64

In [38]:
# Fitting the Best Distribution for Each Activity 
#------------------------------------------------------
from scipy.stats import lognorm, expon, weibull_min, invgauss
from sklearn.mixture import GaussianMixture
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import os

def fit_distributions(durations, max_components=3):
    results = {}
    
    durations_log = np.log(durations).reshape(-1, 1)
    
    # Gaussian Mixture Model with Gamma
    best_bic = np.inf
    best_gmm = None
    best_n_components = 2

    for n_components in range(1, max_components + 1):
        gmm = GaussianMixture(n_components=n_components)
        gmm.fit(durations_log)
        bic = gmm.bic(durations_log)
        if bic < best_bic:
            best_bic = bic
            best_gmm = gmm
            best_n_components = n_components
    
    weights = best_gmm.weights_
    means = np.exp(best_gmm.means_).flatten()
    variances = (np.exp(best_gmm.covariances_).flatten())**0.5
    shapes = (means / variances)**2
    scales = variances**2 / means
    
    results['gamma_mixture'] = (weights, shapes, scales, best_n_components, best_bic)

    # Log-normal distribution
    lognorm_params = lognorm.fit(durations)
    lognorm_bic = np.sum(lognorm.logpdf(durations, *lognorm_params)) - len(durations) * np.log(len(durations))
    results['lognorm'] = lognorm_params + (lognorm_bic,)

    # Exponential distribution
    expon_params = expon.fit(durations)
    expon_bic = np.sum(expon.logpdf(durations, *expon_params)) - len(durations) * np.log(len(durations))
    results['exponential'] = expon_params + (expon_bic,)

    # Weibull distribution
    weibull_params = weibull_min.fit(durations)
    weibull_bic = np.sum(weibull_min.logpdf(durations, *weibull_params)) - len(durations) * np.log(len(durations))
    results['weibull'] = weibull_params + (weibull_bic,)

    # Inverse Gaussian distribution
    invgauss_params = invgauss.fit(durations)
    invgauss_bic = np.sum(invgauss.logpdf(durations, *invgauss_params)) - len(durations) * np.log(len(durations))
    results['invgauss'] = invgauss_params + (invgauss_bic,)

    return results

# Fit distributions for each activity
duration_distributions = {}
durations = data.groupby('ID')['Duration'].apply(list)

for activity, duration_list in durations.items():
    distributions = fit_distributions(np.array(duration_list))
    duration_distributions[activity] = distributions


# Select the best fitting distribution for each activity
best_fit_distributions = {}

for activity, distributions in duration_distributions.items():
    best_bic = np.inf
    best_distribution = None
    
    for dist_name, params in distributions.items():
        if params[-1] < best_bic:
            best_bic = params[-1]
            best_distribution = (dist_name, params)
    
    best_fit_distributions[activity] = best_distribution


# Function to plot the fitted distributions
def plot_fitted_distribution(activity, duration_list, dist_name, params, file_path):
    sns.histplot(duration_list, bins=50, kde=True, stat='density', label='Observed Data', color='skyblue')
    
    x = np.linspace(min(duration_list), max(duration_list), 1000)
    
    if dist_name == 'gamma_mixture':
        weights, shapes, scales, best_n_components, _ = params
        y = np.zeros_like(x)
        for weight, shape, scale in zip(weights, shapes, scales):
            y += weight * gamma.pdf(x, shape, scale=scale)
        plt.plot(x, y, label='Fitted Gamma Mixture', color='red')
    
    elif dist_name == 'lognorm':
        shape, loc, scale, _ = params
        y = lognorm.pdf(x, shape, loc, scale)
        plt.plot(x, y, label='Fitted Log-Normal', color='red')
    
    elif dist_name == 'exponential':
        loc, scale, _ = params
        y = expon.pdf(x, loc, scale)
        plt.plot(x, y, label='Fitted Exponential', color='red')
    
    elif dist_name == 'weibull':
        shape, loc, scale, _ = params
        y = weibull_min.pdf(x, shape, loc, scale)
        plt.plot(x, y, label='Fitted Weibull', color='red')
    
    elif dist_name == 'invgauss':
        mu, loc, scale, _ = params
        y = invgauss.pdf(x, mu, loc, scale)
        plt.plot(x, y, label='Fitted Inverse Gaussian', color='red')
    
    plt.title(f'Activity: {activity}')
    plt.xlabel('Duration (minutes)')
    plt.ylabel('Density')
    plt.legend()
    # plt.show()

    # Save the plot
    plt.savefig(os.path.join(file_path, f'{activity}_fitted_distribution.png'), dpi=300)
    plt.close()


# Plot and save the best-fitting distributions for each activity
file_path = 'C:/Users/e16011413/OneDrive - Ulster University/Desktop/UCAmI/UCAmI Anomaly/SyntheticData'
for activity, duration_list in durations.items():
    dist_name, params = best_fit_distributions[activity]
    plot_fitted_distribution(activity, duration_list, dist_name, params, file_path)


# Print out the fitted distributions
best_fit_distributions = {}

for activity, distributions in duration_distributions.items():
    best_bic = np.inf
    best_distribution = None
    
    for dist_name, params in distributions.items():
        if params[-1] < best_bic:
            best_bic = params[-1]
            best_distribution = (dist_name, params)
    
    best_fit_distributions[activity] = best_distribution

# Save best fitting distributions to a text file
text_file_path = os.path.join(file_path, 'best_fit_distributions.txt')
with open(text_file_path, 'w') as file:
    for activity, (dist_name, params) in best_fit_distributions.items():
        file.write(f"Best fit for Activity {activity}: {dist_name} with params {params}\n")


# Print best fitting distributions for each activity
for activity, (dist_name, params) in best_fit_distributions.items():
    print(f"Best fit for Activity {activity}: {dist_name} with params {params}")

Best fit for Activity 1: exponential with params (40.766666666666666, 622.9514705882352, -372.66819089744394)
Best fit for Activity 4: weibull with params (0.7177705629367819, 0.5333333333333332, 0.7653271511120208, -inf)
Best fit for Activity 5: exponential with params (2.6, 6.9565217391304355, -139.72899775037803)
Best fit for Activity 10: weibull with params (0.2120821295060068, 23.799999999999997, 3.7994096740934964, -266.95924230638576)
Best fit for Activity 13: lognorm with params (1.2384198317738446, 1.1651303232987713, 1.1442546898327788, -95.26521087329502)
Best fit for Activity 15: invgauss with params (0.4116101499519757, 2.349378894785014, 77.46152745346008, -64.86965423314555)
Best fit for Activity 17: exponential with params (0.15, 0.7374999999999998, -73.82486165571657)


In [46]:
# Generation of Synthetic Data
# ---------------------------------
# ---------------------------------
import random
import numpy as np
from scipy.stats import gamma, lognorm, expon, weibull_min, invgauss

def generate_synthetic_duration(dist_name, params):
    """
    Generate a synthetic duration based on the specified distribution.
    """
    if dist_name == 'gamma_mixture':
        weights, shapes, scales, best_n_components, _ = params
        # Randomly choose a component based on weights
        component = np.random.choice(best_n_components, p=weights)
        shape = shapes[component]
        scale = scales[component]
        duration = gamma.rvs(shape, scale=scale)
        
    elif dist_name == 'lognorm':
        shape, loc, scale, _ = params
        duration = lognorm.rvs(shape, loc=loc, scale=scale)
        
    elif dist_name == 'exponential':
        loc, scale, _ = params
        duration = expon.rvs(loc=loc, scale=scale)
        
    elif dist_name == 'weibull':
        shape, loc, scale, _ = params
        duration = weibull_min.rvs(shape, loc=loc, scale=scale)
        
    elif dist_name == 'invgauss':
        mu, loc, scale, _ = params
        duration = invgauss.rvs(mu, loc=loc, scale=scale)
        
    else:
        raise ValueError(f"Unsupported distribution: {dist_name}")
        
    return duration

def generate_synthetic_day(transition_probabilities, first_activity_probabilities, duration_distributions):
    while True:
        synthetic_day = []
        
        # Generate the first activity
        first_activity = random.choices(
            list(first_activity_probabilities.index), 
            weights=first_activity_probabilities.values
        )[0]
        synthetic_day.append(first_activity)
        
        current_activity = first_activity
        steps = 0
        max_steps = 1000  # Avoid potential infinite loops
        min_activities = 9
        max_activities = 11
        num_activities = np.random.randint(min_activities, max_activities + 1)
        
        while steps < max_steps and len(synthetic_day) < num_activities:
            next_activities = list(transition_probabilities[current_activity].keys())
            next_probabilities = list(transition_probabilities[current_activity].values())
            next_activity = random.choices(next_activities, weights=next_probabilities)[0]
            
            if next_activity == 'end':
                break
            
            synthetic_day.append(next_activity)
            current_activity = next_activity
            steps += 1
        
        synthetic_durations = [
            max(1, generate_synthetic_duration(*duration_distributions[activity]))  # Use the best-fitting parameters
            for activity in synthetic_day
        ]
        
        # Ensure the total duration does not exceed 1440 minutes
        total_duration = sum(synthetic_durations)
        while total_duration > 1440 and len(synthetic_day) > 2:
            # Remove a random activity, except for activities labeled 1 or 10
            removable_indices = [i for i, activity in enumerate(synthetic_day) if activity not in [1, 10]]
            if removable_indices:
                remove_index = random.choice(removable_indices)
                synthetic_day.pop(remove_index)
                synthetic_durations.pop(remove_index)
                total_duration = sum(synthetic_durations)
            else:
                break  # Break the loop if no removable activities are found
        
        # If the total duration is less than or equal to 1440, return the synthetic day
        if total_duration <= 1440:
            return synthetic_day, synthetic_durations

# Example usage
# Assuming transition_probabilities, first_activity_probabilities, and duration_distributions are defined

# Generate synthetic days
num_synthetic_days = 10000
synthetic_days = [generate_synthetic_day(transition_probabilities, first_activity_probabilities, best_fit_distributions) for _ in range(num_synthetic_days)]

# Print the first synthetic day
print(synthetic_days[0])
print(synthetic_days[1])
print(synthetic_days[2])
print(synthetic_days[3])


import openpyxl

# Create a workbook
wb = openpyxl.Workbook()
sheet = wb.active

# Set the column headers
sheet.cell(row=1, column=1).value = "Day"
sheet.cell(row=1, column=2).value = "Activities"
sheet.cell(row=1, column=3).value = "Durations"

# Write the synthetic data
for i, day in enumerate(synthetic_days):
    sheet.cell(row=i+2, column=1).value = i+1  # Start day numbering from 1
    sheet.cell(row=i+2, column=2).value = str(day[0])  # Insert the whole list of activities
    sheet.cell(row=i+2, column=3).value = str(day[1])  # Insert the whole list of durations

# Save the workbook
FilePath = 'C:/Users/e16011413/OneDrive - Ulster University/Desktop/UCAmI/UCAmI Anomaly/SyntheticData/synthetic_data.xlsx'
wb.save(FilePath)

([4, 4, 10, 4, 1, 4, 4, 10, 4], [1, 1, 23.80004911797008, 1, 202.0288870678084, 1.8063785112579558, 1, 25.855322982901377, 1.2898645829176476])
([4, 4, 5, 1, 4, 5, 1, 4, 4, 4], [1.692709878869606, 1.5026035101794764, 5.317399698016523, 646.0030097623886, 1.5457391344186042, 7.829042927391299, 308.5305606299302, 1.4233024211755785, 1, 1])
([4, 17, 4, 5, 1, 4, 4, 1, 17, 13], [1, 1, 1.1554160049104478, 3.426819705542867, 586.5749992748378, 1.3353036147903605, 1, 504.4859736769278, 1, 1.7077427638617702])
([4, 15, 17, 4, 5, 4, 13, 4, 10, 4], [1.065756249900661, 18.084417441333468, 1.8481435289519692, 1, 14.67547753049126, 2.760514085930049, 1.526607541519878, 1.0944945967591884, 36.133854574134446, 1])


In [49]:
# Conformance Test on three aspects: 1- Activity Sequence   ,  2- Duration Distribution  ,   3- Transition Probability
# --------------------------------------------------------------------------------------------------------------------
# Activity Sequence Comparison
from difflib import SequenceMatcher

def sequence_similarity(seq1, seq2):
    return SequenceMatcher(None, seq1, seq2).ratio()

def average_sequence_similarity(real_days, synthetic_days):
    similarities = []
    for real_day in real_days:
        best_similarity = max(sequence_similarity(real_day, synthetic_day[0]) for synthetic_day in synthetic_days)
        similarities.append(best_similarity)
    return sum(similarities) / len(similarities)

# Duration Distribution Comparison
# --------------------------------
from scipy.stats import ks_2samp

def compare_duration_distributions(real_durations, synthetic_durations):
    p_values = []
    for activity in real_durations.keys():
        real_duration = real_durations[activity]
        synthetic_duration = synthetic_durations.get(activity, [])
        if synthetic_duration:
            _, p_value = ks_2samp(real_duration, synthetic_duration)
            p_values.append(p_value)
    return p_values

# Transition Probability Matrix Comparison
# ----------------------------------------
import numpy as np

def compute_transition_matrix(days):
    activities = set(activity for day in days for activity in day)
    activity_index = {activity: i for i, activity in enumerate(activities)}
    n = len(activities)
    transition_matrix = np.zeros((n, n))
    
    for day in days:
        for i in range(len(day) - 1):
            current_activity = day[i]
            next_activity = day[i + 1]
            transition_matrix[activity_index[current_activity], activity_index[next_activity]] += 1
    
    transition_matrix = transition_matrix / transition_matrix.sum(axis=1, keepdims=True)
    return transition_matrix, activity_index

def compare_transition_matrices(real_matrix, synthetic_matrix):
    return np.linalg.norm(real_matrix - synthetic_matrix)

# Perform Conformance Checking
# ----------------------------------------
# ----------------------------------------

# Assuming real_days is a list of lists where each sublist is a sequence of activities for a day in the real dataset
real_days = [list(data.loc[data['Date'] == day, 'ID']) for day in data['Date'].unique()]

# Convert real data to a format comparable to synthetic data
real_durations = {activity: data[data['ID'] == activity]['Duration'].tolist() for activity in data['ID'].unique()}

# Generate transition matrices for real and synthetic data
real_transition_matrix, real_activity_index = compute_transition_matrix(real_days)
synthetic_transition_matrix, synthetic_activity_index = compute_transition_matrix([synthetic_day[0] for synthetic_day in synthetic_days])

# Align synthetic activity index with real activity index
aligned_synthetic_transition_matrix = np.zeros_like(real_transition_matrix)
for activity, index in real_activity_index.items():
    if activity in synthetic_activity_index:
        aligned_synthetic_transition_matrix[index] = synthetic_transition_matrix[synthetic_activity_index[activity]]

# Perform comparisons
sequence_similarity_score = average_sequence_similarity(real_days, synthetic_days)
duration_p_values = compare_duration_distributions(real_durations, {activity: [duration for day, durations in synthetic_days for a, duration in zip(day, durations) if a == activity] for activity in real_durations.keys()})
transition_matrix_difference = compare_transition_matrices(real_transition_matrix, aligned_synthetic_transition_matrix)

# Output results
print(f"Average Sequence Similarity: {sequence_similarity_score}")
print(f"Duration Distribution P-Values: {duration_p_values}")
print(f"Transition Matrix Difference: {transition_matrix_difference}")

# Open the text file in write mode
with open("C:/Users/e16011413/OneDrive - Ulster University/Desktop/UCAmI/UCAmI Anomaly/SyntheticData/ConformanceResults.txt", "w") as f:
    f.write(f"Average Sequence Similarity: {sequence_similarity_score}\n")
    f.write(f"Duration Distribution P-Values: {duration_p_values}\n")
    f.write(f"Transition Matrix Difference: {transition_matrix_difference}\n")

Average Sequence Similarity: 0.9002668965905943
Duration Distribution P-Values: [7.369035028118557e-19, 4.425939728548843e-15, 0.8331416981234763, 0.002657831801605838, 0.03115927059642961, 8.019194646686809e-17, 0.9083686330624022]
Transition Matrix Difference: 0.12217815741992558
