In [None]:
import os.path
import time
import zipfile
import pandas as pd
import ujson as json
import os

"""
00: Configuration and imports

NOTE: If using DataSpell, or similar IDE, you may need to increase the maximum allowed memory usage.
- Larger datasets will cause the IDE to completely freeze with the default limit of 4GB ram.
"""

# This script will process all zips located at the input_path
input_path = "../../data/raw_datasets"
output_path = "../../data/processed/19.8_ajo/prom"


# Print all zips that will be processed
def list_zip_files(directory_path):
    zip_files = []
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.lower().endswith(".zip"):
                # zip_files.append(file)
                full_path = os.path.join(root, file)
                relative_path = os.path.relpath(full_path, directory_path)
                zip_files.append(relative_path)
    return zip_files
zip_files_list = list_zip_files(input_path)

print("List of zip files:")
for zip_file in zip_files_list:
    print(zip_file)

In [None]:
"""
01: Helper functions
"""


def parse_metric(zip_file, slice=-1, total_slices=-1, start_time=time.time()):
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:

        # GENERATE A LIST OF ALL CONTAINED FILES
        # AND CONSTRUCT THE FILE PREFIX FOR THE METRIC FILES
        items = zip_ref.namelist()

        # FILTER ONLY THE JSON FILES
        json_files = [x for x in items if x.endswith('.json')]
        json_files.sort()

        # TEMP CONTAINERS
        values_container = {}
        stats_container = {}

        number_of_files = len(json_files)
        # start_time = time.time()
        # LOOP THROUGH THE JSON FILES
        if slice >= 0 and slice < number_of_files:
            files_per_slice = number_of_files // total_slices
            start = slice * files_per_slice
            end = min(start + files_per_slice, number_of_files)
        else:
            start = 0
            end = number_of_files
        # debug_start = int(0.99 * number_of_files)
        count = start
        print(start)
        print(end)
        for path in json_files[start:end]:
            count += 1
            with zip_ref.open(path) as json_file:
                json_data = json.load(json_file)
                print(path)

                # LOOP THROUGH EACH SUB-METRIC
                try:
                    for item in json_data['data']['result']:
                        header = json.dumps(item['metric'])
                        values = dict(item['values'])
    
                        # ADD HEADER KEY IF IT DOESNT EXIST
                        if header not in values_container:
                            values_container[header] = {}
    
                        # OTHERWISE, MERGE OLD AND NEW DICTS
                        values_container[header].update(values)
                except Exception as e:
                    print(e)
                    print(f"Could not parse {json_file}")
                    

            # if True:
            if count % 500 == 0 or count == end:
                print(
                    f"Progress {count}/{number_of_files}, ({count / number_of_files * 100} %) (time_spent: {time.time() - start_time} s  - avg: {(time.time() - start_time) / count} s)")

        print("Done - converting to dataframe")
        # CONVERT VALUES DICT TO DATAFRAME
        values_df = pd.DataFrame(values_container).apply(pd.to_numeric,
                                                         errors='ignore')  # Move to numeric if possible, cutting off 90% of size

        # CONVERT THE STATS DICT TO A DF, THEN TRANSPOSE IT
        print("Done - transposing dataframe")
        stats_df = pd.DataFrame(stats_container).transpose()

        print("Done")
        return values_df, stats_df


def get_descriptive_keys(header_group):
    headers = header_group
    first_elem = list(headers.values())[0]
    first_keys = list(first_elem.keys())
    # first_keys = list(headers.keys())[0].keys()
    blacklist = []

    for key, val in headers.items():
        assert first_keys == list(val.keys()), 'ALL HEADER KEYS DO NOT MATCH'

    for key in first_keys:
        # is_static = always_matches(headers, lambda x: headers[0][key] == x[key])
        is_static = True
        for _, val in headers.items():
            if not first_elem[key] == val[key]:
                is_static = False
        # print(is_static)
        if is_static:
            blacklist.append(key)
    print(blacklist)
    # print(set(first_keys) ^ set(blacklist))
    descriptive_keys = set(first_keys) - (
                set(blacklist) ^ {"__name__"} ^ {"instance"})  # Keep descriptive keys and the name
    return descriptive_keys


def remove_unnecessary_keys(df, cols, descriptive_keys):
    # LOAD ORIGINAL COLUMN AS DICT, THEN NUKE THE REPETITIVE KEYS
    # FINALLY, CONVERT PRODUCT BACK TO STRING
    new_cols = []
    original_cols = cols
    for col in original_cols:
        as_dict = json.loads(col)
        repetitive_headers = set(as_dict.keys()) ^ descriptive_keys

        for bad_header in repetitive_headers:
            del as_dict[bad_header]

        as_string = json.dumps(as_dict)
        new_cols.append(as_string)

    # MAKE DICT OF RENAMED COLUMNS, AND MODIFY THE DF
    cols_swaps = dict(zip(original_cols, new_cols))
    df.rename(columns=cols_swaps, inplace=True)


def sub_df_by_instance(df):
    sub_df_cols = {}
    for col in df.columns:
        try:
            col_as_dict = json.loads(col)
        except Exception as e:
            print(e)
            print(f"Could not parse {col}")
            continue
        if "instance" not in col_as_dict:
            instance = "unknown"
        else:
            instance = col_as_dict["instance"]
        if instance not in sub_df_cols:
            sub_df_cols[instance] = []
        sub_df_cols[instance].append(col)
    sub_dfs = {instance: df[cols] for instance, cols in sub_df_cols.items()}
    return sub_dfs


def main(input_path, zip_relative_path, output_path2):
    dfs = []
    print(f"Processing {zip_relative_path}")
    zip_name = zip_relative_path.replace(".zip", "")  # Remove file-extension for now
    full_output_path = f"{output_path2}/{zip_name}"
    intermediate_folder_path = f"{full_output_path}/intermediate"
    processed_folder_path = f"{full_output_path}/"
    start_time = time.time()
    num_slices = 100  # In how many slices should the data be processed - lower values consume more memory
    for slice in range(num_slices):
        os.makedirs(intermediate_folder_path, exist_ok=True)
        output_path = intermediate_folder_path + f"/{slice}.feather"
        if os.path.exists(output_path):
            values = pd.read_feather(output_path)
            print(f"Got intermediate file from {output_path}")
        else:
            values, stats = parse_metric(
                zip_file=f'{input_path}/{zip_relative_path}',
                slice=slice,
                total_slices=num_slices,
                start_time=start_time
            )
            # values = values.apply(pd.to_numeric, errors='coerce')
            values.reset_index(drop=False, inplace=True, names=["timestamp"])  # Reset to default index (in case of old pandas/pyarrow version)
            values.to_feather(output_path)
            print(f"Saved intermediate {output_path}")
        values.index = values["timestamp"]
        values.drop(columns=["timestamp"], inplace=True)
        dfs.append(values)
    
    df = pd.concat(dfs, axis=1)
    df = df.loc[:,
         ~df.columns.duplicated()]  # TODO: Does removing duplicates remove information? Happens probably at zip-file slice boundaries
    df = df.reset_index(drop=False, inplace=False, names=["timestamp"])  # Reset to default index (in case of old pandas/pyarrow version)
    df.to_feather(intermediate_folder_path + f"/full.feather")
    df.index = df["timestamp"]
    df.drop(columns=["timestamp"], inplace=True)
    
    print(f"Saved full df to {intermediate_folder_path}/full.feather")

    # df.index = df["timestamp"]  # Re-add index (in case of old pandas/pyarrow version)

    # Split df by instance
    sub_dfs = sub_df_by_instance(df)

    # Minimize headers and save each instance as separate file
    for instance, sub_df in sub_dfs.items():
        df_minimized = sub_df.copy()
        # df_minimized.index = df_minimized["timestamp"] # 
        # df_minimized.drop("index", axis=1, inplace=True)

        # Group headers by name
        grouped_by_name = {}
        for col in list(df_minimized.columns):
            header_dict = json.loads(col)
            name = header_dict["__name__"]
            if name not in grouped_by_name:
                grouped_by_name[name] = {}
            grouped_by_name[name][col] = header_dict

        # Minimize headers
        for feature_name, headers in grouped_by_name.items():
            try:
                descriptive_keys = get_descriptive_keys(headers)
            except:
                print(f"Non-matching keys: {feature_name}")
                continue
            remove_unnecessary_keys(df_minimized, headers, descriptive_keys)

        # Save df
        path = f"{processed_folder_path}"
        os.makedirs(path, exist_ok=True)
        # df_minimized = df_minimized.sort_index()  # Make sure the dataframe is sorted by timestamp
        # df_minimized.index = df_minimized["index"]
        df_minimized = df_minimized.sort_index().reset_index(drop=False, inplace=False, names=["timestamp"])
        # print(df_minimized.index)
        df_minimized.to_feather(path + f"/{instance}.feather")
        print("Saved instanced df as", path + f"/{instance}.feather")

"""
02: Process and save dataframes
"""

zips = list_zip_files(input_path)
print(zips)

for zip_name_full in zips:
    print(zip_name_full)
    main(input_path, zip_name_full, output_path)


In [None]:
"""
03: Print some statistics from the resulting dataframes

- Mostly for quick sanity checking of the results
"""
import os
import pandas as pd
import pyarrow.feather as feather

def count_feather_files(fpath):
    feather_files = []
    file_info = []
    for root, dirs, files in os.walk(fpath):
        for file in files:
            if file.endswith(".feather"):
                file_path = os.path.join(root, file)
                feather_files.append(file_path)

    for file_path in feather_files:
        try:
            df = pd.read_feather(file_path)
            file_size = os.path.getsize(file_path)
            file_info.append((file_path, file_size, len(df.columns), len(df)))
        except pd.errors.EmptyDataError:
            file_info.append((file_path, 0, 0, 0))
        except Exception as e:
            file_info.append((file_path, -1, -1, -1))

    file_info.sort(key=lambda x: x[1], reverse=True)  # Sort based on file size in descending order

    for info in file_info:
        print("Size:", info[1] / 10**6, "mb", end="\t")
        print("Cols:", info[2], end="\t")
        print("Rows:", info[3], end="\t")
        print("File:", info[0].replace(fpath, ""))

# Provide the path to the folder containing the feather files
count_feather_files(output_path)