In [1]:
import os
import pandas as pd
from tqdm import tqdm
from collections import Counter
import string
from joblib import Parallel, delayed

### Functions to calculate the **support** of the high-\low-level command pairs. Each high-level command will form 5 high-\low-level command pairs with it's following 5 low-level commands based on the timestamp

In [3]:
def calculate_support(df, itemsets, counts):
    sup_list = []
    for itemset in tqdm(itemsets, desc="Calculating support"):
        s1 = counts[itemset[0]]
        s2 = df[df['Tool_Menu'] == itemset[0]]['UNDOs'].apply(lambda x: itemset[1] in x).sum()
        sup_list.append({
            'tuple_commands': itemset,
            'tool/menu': itemset[0],
            'event': itemset[1],
            'sup1': s1,
            'sup2': s2
        })
    return sup_list

def set_tuple_commands(df):
    C = []
    for _, row in tqdm(df.iterrows(), desc="Processing to get command pairs"):
        high_level_message = row['Tool_Menu']
        for item in row['UNDOs']:
            command_set = [high_level_message, item]
            if command_set not in C:
                C.append(command_set)
    return C

def following_five_undo(df):
    grouped_data = df.groupby('session_anonymized')
    processed_data = []
    for session_id, group_df in tqdm(grouped_data, desc="Processing grouped data filtering"):
        for index, row in (group_df.iterrows()):
            if row['cat'] in ['Tool', 'Menu']:
                result_dict = {
                    'Tool_Menu': row['message_eng'],
                    'UNDOs': None, 
                }
                ts = row['ts']
                # Define the range of indices for surrounding 20 rows
                start_index = max(0, index - 10)
                end_index = index + 10
                # Use boolean indexing to filter rows within the desired range
                surrounding_rows = group_df[(group_df.index >= start_index) & (group_df.index <= end_index)]
                sub_rows = surrounding_rows[(surrounding_rows['ts'] >= ts)]
                up_rows = surrounding_rows[(surrounding_rows['ts'] < ts)]
                # Find the first 3 and last 1 'UNDO' action in these subsequent rows
                undo_rows = pd.concat([sub_rows[sub_rows['cat'] == 'UNDO'].head(3),
                                       up_rows[up_rows['cat'] == 'UNDO'].head(1)])

                undo_messages = undo_rows['message_eng'].tolist()
                result_dict['UNDOs'] = undo_messages
                processed_data.append(result_dict)

    results_df = pd.DataFrame(processed_data)
    return results_df

def get_all_files(directory_path):
    all_files = []
    for root, dirs, files in os.walk(directory_path):
        dirs.sort()
        files.sort()
        for file in files:
            if file.endswith('.parquet'):
                all_files.append(os.path.join(root, file))
    return all_files

def read_unique_commands(path):
    df_unique_commands = pd.read_parquet(path)
    df_unique_commands.reset_index(inplace=True)
    return df_unique_commands

def drop_less_commands(df, commands_set):
    return df[~df['message'].isin(commands_set)]

def read_language_dic(path, df):
    language_df = pd.read_csv(path)
    translation_dict = pd.Series(language_df.label.values, index=language_df.message).to_dict()
    df['message_eng'] = df['message'].map(translation_dict)
    return df.dropna(subset=['message_eng'])

def contains_non_printable(text):
    printable = set(string.printable)
    return any(char not in printable for char in text)

def process_file(file_path, file_index, unique_commands_path, lang_dict_path):
    try:
        df = pd.read_parquet(file_path)
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
        return pd.DataFrame()
    
    try:
        df_unique_commands = read_unique_commands(unique_commands_path)
        drop_commands = df_unique_commands[df_unique_commands['count'] <= 10]['message'].tolist()

        df = drop_less_commands(df, drop_commands)
        df = read_language_dic(lang_dict_path, df)
        df = df[~df['message_eng'].apply(contains_non_printable)]

        list_df = following_five_undo(df)
        counts = Counter(list_df['Tool_Menu'])

        tuple_commands = set_tuple_commands(list_df)

        sup_list = calculate_support(list_df, tuple_commands, counts)

        sup_df = pd.DataFrame(sup_list)
        sup_df['file_index'] = file_index  # Add column to mark which file the data came from

        return sup_df
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return pd.DataFrame()

In [4]:
def main(path, unique_commands_path, lang_dict_path, output_path, n_jobs=80):
    all_files = get_all_files(path)
    print(f"Total Parquet files found: {len(all_files)}")

    results = Parallel(n_jobs=n_jobs)(delayed(process_file)(file_path, idx, unique_commands_path, lang_dict_path)
                                      for idx, file_path in enumerate(tqdm(all_files)))

    # Concatenate results into a single DataFrame
    final_result = pd.concat(results, ignore_index=True)

    final_result.to_parquet(output_path, index=False)
    print(f"Results saved to {output_path}")

In [None]:
if __name__ == "__main__":
    data_path = '/data/test_data'
    unique_commands_path = '/data/message_counts.parquet'
    lang_dict_path = '/data/command_dictionary.csv'
    output_path = '/data/support.parquet'
    
    main(data_path, unique_commands_path, lang_dict_path, output_path, n_jobs=80)