# External sort algorithm on large data files

## Imports, constants and setups

In [1]:
import os

notebook_mode: int = int(
    input(
        """
    Select notebook mode:
    1. Google Colab  2. Local
    """
    )
)

if notebook_mode == 1:
    # Run on Colab.
    INPUT_PATH: str = "/content/drive/MyDrive/Ellinbank/video_observation/data/"
    SCRIPT_PATH: str = "/content/drive/MyDrive/Ellinbank/video_observation/training_testing/data_labelling/"
    # OUTPUT_PATH: str = "/content/drive/MyDrive/Ellinbank/video_observation/output/"
    OUTPUT_PATH: str = "./out/"
    os.system(command="cp {}custom_model.py .".format(SCRIPT_PATH))
    os.system(command="cp {}inference.py .".format(SCRIPT_PATH))
    os.system(command="cp {}utils.py .".format(SCRIPT_PATH))
    os.system(command="cp {}operation.py .".format(SCRIPT_PATH))
elif notebook_mode == 2:
    INPUT_PATH: str = "../../../../data/"
    SCRIPT_PATH: str = "./"
    OUTPUT_PATH: str = "./out/"

In [2]:
os.system(command="rm -rf {}".format(OUTPUT_PATH))
try:
    os.mkdir(path=OUTPUT_PATH)
except FileExistsError:
    pass

In [3]:
import pandas as pd


files: list[str] = os.listdir(path=INPUT_PATH)
files = [f for f in files if f.endswith(".zip")]

sensor_names: list[str] = [name.split("_")[0] for name in files]
print(sensor_names)

['MOS2E03230475']


In [4]:
import random


window_size: int = 600  # 300: 10 seconds
window_per_epoch: int = 200
epoch: int = 1
batch_size: int = 64
chunk_size: int = 300
# random.seed(715) # 715 looks good.
random.seed(785)  # 785 makes "other" looks bad, otherwise is good.

## Split the large data file and sort these partitions

In [5]:
import gc

import pandas.io.parsers.readers

for f in files:
    os.system("rm -rf {}{}_*.txt".format(OUTPUT_PATH, f.split("_")[0]))
    # Path("{}{}.txt".format(OUTPUT_PATH, f.split("_")[0])).touch()
    data_chunks: pandas.io.parsers.readers.TextFileReader = pd.read_csv(
        filepath_or_buffer="{}/{}".format(INPUT_PATH, f),
        # nrows=14000,
        chunksize=window_size * 1,
        # chunksize=1,
    )
    df_count: int = 0
    raw_data: pd.DataFrame
    for raw_data in data_chunks:
        # Sort values based on timestamps.
        raw_data.sort_values(
            by=["timestamps"],
            ascending=True,
            inplace=True,
        )
        raw_data = raw_data.reset_index(drop=True)
        raw_data.to_csv(
            path_or_buf="{}{}_{}.txt".format(
                OUTPUT_PATH,
                f.split("_")[0],
                df_count,
            ),
            header=True,
            index=False,
        )
        df_count += 1

        if df_count == 5:
            break
        gc.collect()

In [6]:
from utils import merge_external_files, merge_external_files_chunks

while True:
    split_files: list[str] = os.listdir(path=OUTPUT_PATH)
    no_of_files: int = len(split_files)

    if no_of_files == 1:
        break

    if no_of_files % 2 == 0:
        for file_index in range(0, no_of_files, 2):
            merge_external_files_chunks(
                f1="{}{}".format(
                    OUTPUT_PATH,
                    split_files[file_index],
                ),
                f2="{}{}".format(
                    OUTPUT_PATH,
                    split_files[file_index + 1],
                ),
                output_path=OUTPUT_PATH,
                chunk_size=chunk_size,
            )
            gc.collect()
    else:
        for file_index in range(0, no_of_files - 1, 2):
            merge_external_files_chunks(
                f1="{}{}".format(
                    OUTPUT_PATH,
                    split_files[file_index],
                ),
                f2="{}{}".format(
                    OUTPUT_PATH,
                    split_files[file_index + 1],
                ),
                output_path=OUTPUT_PATH,
                chunk_size=chunk_size,
            )
            gc.collect()

In [7]:
import os

output_files: list[str] = os.listdir("./out/")
print(output_files[0])

merged_6565788.txt


In [8]:
import gzip
import shutil

import os

output_files: list[str] = os.listdir("./out/")
print(output_files[0])

with open(
    file="./out/{}".format(output_files[0]),
    mode="rb",
) as f_in, gzip.open(
    filename="./out/{}_sorted.gz".format(sensor_names[0]),
    mode="wb",
) as f_out:
    shutil.copyfileobj(f_in, f_out)

merged_6565788.txt


In [9]:
import numpy as np
import pandas as pd
import pandas.io.parsers.readers

chunked_df: pandas.io.parsers.readers.TextFileReader = pd.read_csv(
    filepath_or_buffer="./out/{}".format(output_files[0]),
    chunksize=600,
    sep=",",
)

df: pd.DataFrame
last_row: np.datetime64 = np.datetime64("1970-01-01T00:00:00.000")
for df in chunked_df:
    ts: np.ndarray = df["timestamps"].to_numpy(dtype=np.datetime64)
    if not last_row < ts[0]:
        print(False)
        break
    last_row = ts[-1]
    is_sorted = lambda a: np.all(a[:-1] <= a[1:])
    if not is_sorted(ts):
        print(False)
        break
    gc.collect()
print(True)

True
