In [None]:
import pandas as pd
import numpy as np
import ast
from functools import reduce
from sklearn.ensemble import IsolationForest
import json
from collections import defaultdict
import math

In [None]:
'''
given the source/checkin_checkout_history_updated.csv file, identify the users and its corresponding weeks where it has been acting differently/unusual
steps:
1. process the data to group by (user, week) and get {total visits, total length of visits, total calories burnt}
    - can use map reduce to do this, or pandas <(user, week), (total visits, total length of visits, total calories burnt)>
2. run isoloation forest on the data to identify the outliers for anomaly detection
'''

In [None]:
#simple reading of data into a Pandas dataframe and changing a few of the columns to suit our needs
df_byVisit = pd.read_csv('../data/_raw/checkin_checkout_history_updated.csv')
#we decided not to do any anylsis based on time of day, so we needed length instead
df_byVisit['checkin_time'] = pd.to_datetime(df_byVisit['checkin_time'])
df_byVisit['checkout_time'] = pd.to_datetime(df_byVisit['checkout_time'])
df_byVisit['session_seconds'] = (df_byVisit['checkout_time'] - df_byVisit['checkin_time']).dt.total_seconds()
df_byVisit['week'] = df_byVisit['checkin_time'].dt.isocalendar().week
# sometimes days in january counted as week 52 so I just changed them to week 0
# if we had multiple years of data, we could just add 52*(current year - earliest year)
df_byVisit.loc[(df_byVisit['checkin_time'].dt.month == 1) & (df_byVisit['week'] == 52), 'week'] = 0
df_byVisit.drop(['gym_id', 'checkin_time', 'checkout_time', 'workout_type'], axis=1, inplace=True)


In [None]:
# some self explanitory constants that we used
# we might have also changed some things around to not use some of these constants
CONSTANT_TOTAL_USERS = df_byVisit['user_id'].nunique()

CONSTANT_LAST_WEEK = df_byVisit['week'].max()
CONSTANT_FIRST_WEEK = df_byVisit['week'].min()

CONSTANT_AVG_CALORIES_SESSION = df_byVisit['calories_burned'].mean()
CONSTANT_AVG_LENGTH_SESSION = df_byVisit['session_seconds'].mean()

In [None]:
# The next 2 functions are the basic map-reduce to transform the data into grouping by user/week
# and counting how many times they visited in a week + total time + total calories burned

# map each entry to key=id+week and value=numVisits,length,calories
def mapFunc_groupByUserWeek(row):
    _, row_data = row
    user_id = row_data['user_id']
    calories = int(row_data['calories_burned'])
    seconds = int(row_data['session_seconds'])
    week = int(row_data['week'])
    # dealing with non-tuple keys is much easier especially for converting to files
    return (str(user_id) + "-" + str(week), (1, seconds, calories))

# reduce by summing, to calculate users gym activity by week
def reduceFunc_groupByUserWeek(acc, pair):
    key, value = pair
    if key in acc:
        acc[key] = tuple(map(lambda x, y: x + y, acc[key], value))
    else:
        # i think they're already ints after the changes I made above
        acc[key] = (int(value[0]), int(value[1]), int(value[2]))
    return acc   


# if a user doesn't go in a whole week, we still need an empty entry for that week for our anomaly detection
# so I'm going to find the first and last week each user went to fill in empty weeks

# map each entry to key=id and value=week
def mapFunc_addEmptyWeeks(row):
    _, entry = row
    key = entry['user_id']
    value = int(entry['week'])
    return (key, value)

# reduce by taking the first and last week they started going to the gym
def reduceFunc_findWeekRange(acc, pair):
    key, value = pair
    if key in acc:
        newBot = min(acc[key][0] , value)
        newTop = max(acc[key][1] , value)
        acc[key] = (newBot, newTop)
    else:
        acc[key] = (value, value)
    return acc

# this reduce function will add all the empty weeks 
# and the output will be the starting dictionary inputted to reduceFunc_groupByUserWeek
def reduceFunc_addEmptyWeeks(acc, pair):
    user, week_range = pair
    bot, top = week_range
    for i in range(int(bot), int(top) + 1):
        acc[user + '-' + str(i)] = (0, 0, 0)
    return acc

In [None]:
# setting up empty weeks first
mappedData_addEmptyWeeks = list(map(mapFunc_addEmptyWeeks, df_byVisit.iterrows()))

userlyWeekRange = reduce(reduceFunc_findWeekRange, mappedData_addEmptyWeeks, {})
emptyWeeks = reduce(reduceFunc_addEmptyWeeks, userlyWeekRange.items(), {})


In [None]:
# now adding all the actual information to the "empty weeks" dict returned from above cell
mappedData_groupByUserWeek = list(map(mapFunc_groupByUserWeek, df_byVisit.iterrows()))

weekly_userly_data = reduce(reduceFunc_groupByUserWeek, mappedData_groupByUserWeek, emptyWeeks)



In [None]:
# this is much more necessary when the data is actually so large that it can't be stored in memory in its entirety


# csv_df = pd.DataFrame.from_dict(reduced_data, orient='index', columns=['total_sessions', 'total_session_seconds', 'total_calories'])
# # csv_df = csv_df.reset_index()

# csv_df.to_csv('processed_data.csv', index=True)
# csv_df

# storing both dict version and other version NOTE: do not need to store in files as we can just use the variables
# with open("groupByUserWeek.json", "w") as file:
#     json.dump(weekly_userly_data, file)

# with open("groupByUserWeek_dict.json", "w") as file:
#      json.dump(weekly_userly_data_dict, file)

In [None]:
# turn jsons into a df, then run the anomaly forest on it

transformed_data = [
    (*key.split('-'), *values) for key, values in weekly_userly_data.items()
]

df_byUserWeek_noIndex = pd.DataFrame(transformed_data, columns=['user_id', 'week', 'total_sessions', 'total_seconds', 'total_calories'])
df_byUserWeek_noIndex.set_index(['user_id', 'week'], inplace=True)


# I think the code below does the same as the code above, 
# but having an index vs not having one was a little confusing when figuring out stuff later on
df_byUserWeek = pd.DataFrame(list(weekly_userly_data.items()), columns=["user_week", "visits_time_calories"])

df_byUserWeek[["user_id", "week"]] = df_byUserWeek["user_week"].str.split("-", expand=True)
df_byUserWeek["week"] = df_byUserWeek["week"].astype(int)

df_byUserWeek[["total_sessions", "total_seconds", "total_calories"]] = pd.DataFrame(df_byUserWeek["visits_time_calories"].tolist(), index=df_byUserWeek.index)

df_byUserWeek.drop(columns=["user_week", "visits_time_calories"], inplace=True)



In [None]:
# df_byUserWeek_noIndex.to_json("groupByUserDataFrame.json", orient="records", indent=4)

CONSTANT_AVG_VISITS_WEEK = df_byUserWeek_noIndex['total_sessions'].mean()
CONSTANT_AVG_CALORIES_WEEK = df_byUserWeek_noIndex['total_seconds'].mean()
CONSTANT_AVG_LENGTH_WEEK = df_byUserWeek_noIndex['total_calories'].mean()

# defines the distance between 2 normalized points
# point2 is the precomputed mean for a specific user and will already be prenormalized
# the other values must be normalized using the mins and maxs in lst and the actual data v1 t1 c1

def distance(v1, t1, c1, point2, lst, id):
    minv, maxv, mint, maxt, minc, maxc = lst
    # point2 should already be normalized
    v2, t2, c2 = point2
    
    # normalize the data using min-max normalization
    v3 = (v1 - minv) / (maxv - minv)
    t3 = (t1 - mint) / (maxt - mint)
    c3 = (c1 - minc) / (maxc - minc)
    # This was purely for debugging 
    # if v3 > 1 or t3 > 1 or c3 > 1:
    #     print('id: ' + id)
    #     print(str(minv) + '-' + str(maxv))
    #     print(str(mint) + '-' + str(maxt))
    #     print(str(minc) + '-' + str(maxc))
    #     print(str(v1) + '-' + str(t1) + '-' + str(c1))

    return math.sqrt( (v2-v3)**2+(t2-t3)**2+(c2-c3)**2 )



In [None]:
# after performing anomaly detection we want to see how relevent each anomaly is
# going to use a modified TF.IDF where each document is a users list of anomaly weeks
# and each word is a week, but instead of the week appearing a lot, we're going to use distance from mean
# this will allow us to see how extreme of an anomaly it is, and if it's common among all users
# perhaps everyones gym usage went down cause there was a holiday, or it was raining all week

def mapFunc_findUserMean(row):
    _, entry = row
    value = (entry['week'], entry['total_sessions'], entry['total_seconds'], entry['total_calories'])
    return (entry['user_id'], value)

def reduceFunc_weekRange_sumStats_statRange(acc, pair):
    key, value = pair
    if key in acc:
        # I know this is the most spaghetti code I've ever written but it's okay
        # the first 2 entries in the tuple are to find each users first and last week
        # the next 3 are to sum their visits, length, calories so I can find average
        # the next 6 are to find min and max of each stat for my min-max normalization
        acc[key] = ( min(acc[key][0] , value[0]) , 
                    max(acc[key][1] , value[0]),
                    acc[key][2] + value[1] , 
                    acc[key][3] + value[2] , 
                    acc[key][4] + value[3] ,
                    min(acc[key][5] , value[1]),
                    max(acc[key][6] , value[1]),
                    min(acc[key][7] , value[2]),
                    max(acc[key][8] , value[2]),
                    min(acc[key][9] , value[3]),
                    max(acc[key][10] , value[3]) )
    else:
        acc[key] = (value[0], value[0], 
                    value[1], value[2], value[3], 
                    value[1], value[1], 
                    value[2], value[2], 
                    value[3], value[3])

    return acc
        
def reduceFunc_normalizeAverageStats(acc, pair):
    key, value = pair
    numWeeks = value[1] - value[0] + 1

    avgSession = value[2] / numWeeks
    avgSeconds = value[3] / numWeeks
    avgCalories = value[4] / numWeeks

    # value[5:6] are min max of sessions
    # value[7:8] are min max of seconds
    # value[9:10] are min max of calories
    normSession = (avgSession - value[5]) / (value[6] - value[5])
    normSeconds = (avgSeconds - value[7]) / (value[8] - value[7])
    normCalories = (avgCalories - value[9]) / (value[10] - value[9])
    acc[key] = (normSession, normSeconds, normCalories)
    return acc




In [None]:
mappedData_findUserMean = list(map(mapFunc_findUserMean, df_byUserWeek.iterrows()))

weekly_userly_total = reduce(reduceFunc_weekRange_sumStats_statRange, mappedData_findUserMean, {})
weekly_userly_mean = reduce(reduceFunc_normalizeAverageStats, weekly_userly_total.items(), {})


#print(weekly_userly_mean_dict)

In [None]:
# Okay for TF.IDF the TF part is supposed to be a number between 0-1 but distance from mean might not be,
# and also a difference in 1 visit and 2 is much more significant that 3600 seconds and 3601

# to combat this, I used min-max normalization specific to each user to define a more appropriate distance metric
# then I found the max distance and divided each distance by that, user by user, so get the number between 0-1

# then we need to actually compute IDF for each week 
# which doesn't need to be repeated for each user since it's always the same
# 

# then we need to compute TF' for each user/week

In [None]:
# this distance is normalized
df_byUserWeek["distance_from_mean"] = df_byUserWeek.apply(
    lambda entry: distance(entry["total_sessions"], entry["total_seconds"], entry["total_calories"], 
                         weekly_userly_mean[entry["user_id"]],
                         weekly_userly_total[entry["user_id"]][5:11], entry['user_id']),
    axis=1
)


In [None]:
def mapFunc_findMaxDistance(row):
    _, row_data = row
    user_id = row_data['user_id']
    distance = float(row_data['distance_from_mean'])
    return (user_id, distance)

# reduce by taking max
def reduceFunc_findMaxDistance(acc, pair):
    key, value = pair
    if key in acc:
        acc[key] = max(acc[key], value)
    else:
        acc[key] = value
    return acc 




In [None]:
#run the above map reduce
#also have dataframe modify distance row to divide by max distance per user
mappedData_findMaxDistance = list(map(mapFunc_findMaxDistance, df_byUserWeek.iterrows()))
user_max_distance = reduce(reduceFunc_findMaxDistance, mappedData_findMaxDistance, {})

# now this distance is between 0-1 like Term Frequency is supposed to be
df_byUserWeek["distance_from_mean"] = df_byUserWeek.apply(
    lambda row: row["distance_from_mean"] / user_max_distance[row["user_id"]],
    axis=1
)


In [None]:
#this process takes forever to run so we just run it once and save it into a file
#if you really want to run this yourself, just change False to True but it might take 20 minutes
if False:
    df_byUserWeek_withanomaly  = df_byUserWeek_noIndex.copy()
    df_byUserWeek_withanomaly.reset_index(inplace=True)

    model = IsolationForest(contamination=0.05, random_state=42)

    column_to_check = 0

    def detect_anomalies_for_user(user_group):
    
        user_group['anomaly'] = model.fit_predict(user_group[['total_sessions', 'total_seconds', 'total_calories']])
        
        return user_group

    grouped = list(df_byUserWeek_withanomaly.groupby('user_id'))

    def process_group(group):
        user_id, user_group = group
        if len(user_group) < 2:
            print(user_id)
            return pd.DataFrame()
        return detect_anomalies_for_user(user_group)
        
    results = map(process_group, grouped)
    df_anomalies = pd.concat(results)
    df_anomalies['week'] = df_anomalies['week'].astype('int64')
    df_anomalies = df_anomalies.merge(df_byUserWeek[['user_id', 'week', 'distance_from_mean']], 
                     on=['user_id', 'week'], 
                     how='inner')
    df_anomalies.to_csv('df_byUserWeek_withAnomaly.csv', index=False)

In [None]:
def mapFunc_IDF(row):
    _, entry = row
    key = entry['week']
    value = 1

    return (key, value)

def reduceFunc_countWeeks(acc, pair):
    key, value = pair
    if key in acc:
        acc[key] += value
    else:
        acc[key] = (value)

    return acc
        
def reduceFunc_computeIDF(acc, pair):
    key, value = pair

    idf = math.log(CONSTANT_TOTAL_USERS/value)
    acc[key] = idf
    return acc



In [None]:
# reading back the anomalous data
df_byUserWeek_all = pd.read_csv('df_byUserWeek_withAnomaly.csv')
df_byUserWeek_anomaly = df_byUserWeek_all.copy()
df_byUserWeek_anomaly = df_byUserWeek_all[df_byUserWeek_all['anomaly'] == -1]


# given any week, enter that as the key to weekly_IDF and the value will be the necessary IDF
# for computing TF.IDF
mappedData_findingIDF = list(map(mapFunc_IDF, df_byUserWeek_anomaly.iterrows()))

weekly_anomaly_count = reduce(reduceFunc_countWeeks, mappedData_findingIDF, {})
weekly_IDF = reduce(reduceFunc_computeIDF, weekly_anomaly_count.items(), {})

In [None]:
# this adds a column to make the next map reduce easier
df_byUserWeek_anomaly['IDF'] = df_byUserWeek_anomaly['week'].map(weekly_IDF)

# while it shouldn't happen that any week in this dataframe doesn't have a corresponding entry in the dict
# this is just to prevent any bugs
df_byUserWeek_anomaly['IDF'] = df_byUserWeek_anomaly['IDF'].fillna(0)


In [None]:
df_byUserWeek_anomaly["TFIDF"] = df_byUserWeek_anomaly['distance_from_mean'] * df_byUserWeek_anomaly['IDF']

#TODO make a histogram for the values and use it to pick a nice cutoff
# put that in the report

In [None]:
# this is for graphing purposes. if you wish to generate more graphs, turn False into True and alter the user_id below
if False:
    anomaly_map = {}
    for user_id in df_byUserWeek_anomaly['user_id'].unique():
        anomaly_count = len(df_byUserWeek_anomaly[(df_byUserWeek_anomaly['user_id'] == user_id) & 
                                                (df_byUserWeek_anomaly['anomaly'] == -1)])
        if anomaly_count not in anomaly_map:
            anomaly_map[anomaly_count] = []
        if len(anomaly_map[anomaly_count]) < 10:
            anomaly_map[anomaly_count].append(user_id)

    import matplotlib.pyplot as plt
    from mpl_toolkits.mplot3d import Axes3D

    def plot_user_3d(df, user_id):
        user_data = df[df['user_id'] == user_id]
        
        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(111, projection='3d')
        
        normal = user_data[user_data['anomaly'] != -1]
        ax.scatter(normal['total_sessions'], normal['total_seconds'], 
                normal['total_calories'], c='blue', label='Normal')
        
        anomalies = user_data[user_data['anomaly'] == -1]
        ax.scatter(anomalies['total_sessions'], anomalies['total_seconds'], 
                anomalies['total_calories'], c='red', label='Anomaly')
        
        ax.set_xlabel('Total Sessions')
        ax.set_ylabel('Total Seconds')
        ax.set_zlabel('Total Calories')
        ax.set_title(f'User {user_id} Activity Analysis')
        ax.legend()
        plt.savefig(f'user_{user_id}_3d_plot.png', dpi=300, bbox_inches='tight')
        plt.show()

    plot_user_3d(df_byUserWeek_all, user_id="user_1017")
