# Check the content of the data files

For each file in `goodFiles.txt`, check the content of the file, which comprises: 
- that the date in the name of the file corresponds to that inside the file.
- specific actions depending on the file type (see details below).

In [None]:
# this should be the set to false for production use
# and set to true for testing (default) but can be already set to false from outside (e.g., by the test script)

if "doRunTests" not in globals():
    doRunTests = True

# Check the content of ONE data file

For one file, check that the date in the name of the file corresponds to that inside the file. 

### Checks for csv files



In [None]:
import os
import numpy as np
from datetime import datetime

from sklearn import base


def check_csv_date(fullFname, msg=""):
    """
    Check the date of a csv file
    The date is extracted from the first timestamp in the file
    """
    # date is set to 1970-01-01 00:00:00 if not found
    # date = datetime.fromtimestamp(0, tz=None)
    # date is set to None if not found
    date = None

    if not fullFname.endswith(".csv"):
        msg += f"{fullFname} is not a csv file."
        return date, msg

    # The timestamps are always in column 1, after 3-4 lines of header
    # but it can be a string or a float (in milliseconds)
    data = np.loadtxt(fullFname, skiprows=4, delimiter=",", max_rows=5, dtype=str)

    # check if the file is empty
    if data.size == 0:
        basename = os.path.basename(fullFname)
        msg += f"{basename} is an empty file."
        return date, msg
    # check if we get the expected number of columns
    if data.shape[1] < 2:
        basename = os.path.basename(fullFname)
        msg += f"{basename} has less than 2 columns."
        return date, msg

    # Sounds good, we have some data to check
    # we only need the first timestamp
    timestamp = data[0, 0]
    try:
        timestamp = float(timestamp) / 1000  # in seconds
    except:
        pass
    if isinstance(timestamp, float):
        date = datetime.fromtimestamp(timestamp, tz=None)
    if isinstance(timestamp, str):
        date = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f")
    return date, msg


if doRunTests:
    dataPath = "../dat/ReArm.lnk/ReArm_C1P02/ReArm_C1P02_20210306_V1"
    fname = "ReArm_C1P02_20210306_V1_Reaching/ReArm_C1P02_20210322_V1_r_k.csv"
    fnameToTest = os.path.join(dataPath, fname)
    print(check_csv_date(fnameToTest))

### Checks for xdf files

In [None]:
def check_xdf_date(fullFname, msg=""):
    """
    Check the date of a xdf file
    The date is extracted from the corresponding kinect file in csv format (because the XDF file does not have sure timestamp)
    The Kinect file must have the same number of lines as the EuroMov Mocap Kinect stream.
    """

    if not fullFname.endswith(".xdf"):
        msg += f"{fullFname} is not a XDF file."
        return None, msg

    import pyxdf

    # load the EuroMov-Mocap-Kinect stream

    data, header = pyxdf.load_xdf(
        filename=fullFname,
        select_streams=[{"type": "MoCap", "name": "EuroMov-Mocap-Kinect"}],
        synchronize_clocks=True,
        dejitter_timestamps=False,  # to get the raw timestamps to compare with the CSV
        verbose=0,  # do not log anything
    )

    data_XDF = data[0]

    # get the corresponding kinect file name
    fpath = os.path.dirname(fullFname)
    fname = os.path.basename(fullFname)
    fname, extension = os.path.splitext(fname)
    tokens = fname.split("_")
    kinectFname = (
        tokens[0]
        + "_"
        + tokens[1]
        + "_"
        + tokens[2]
        + "_"
        + tokens[3]
        + "_"
        + tokens[4]
        + "_k.csv"
    )
    fullFnamek = os.path.join(fpath, kinectFname)

    # check if the kinect file exists
    if not os.path.isfile(fullFnamek):
        msg += f"Kinect file {kinectFname} not found. Cannot check date without the corresponding kinect file."
        return None, msg

    # get the kinect data
    data_CSV = np.loadtxt(fullFnamek, skiprows=3, delimiter=",", dtype=float)

    # get the number of lines in the xdf and csv file
    nbLines_XDF = len(data_XDF["time_stamps"])
    nbLines_CSV = data_CSV.shape[0]

    # check if the number of lines match
    if nbLines_CSV == nbLines_XDF and nbLines_CSV > 0:
        timestampCSV_sec = data_CSV[:, 0] / 1000
        date = datetime.fromtimestamp(timestampCSV_sec[0], tz=None)
    else:
        fname = os.path.basename(fullFname)
        fnamek = os.path.basename(fullFnamek)
        msg += f"Number of lines ({nbLines_CSV}) in the CSV file {fnamek} does not match the number of lines ({nbLines_XDF}) in the XDF file ({fname})."
        date = None

    return date, msg


if doRunTests:
    dataPath = "../dat/ReArm.lnk/ReArm_C1P02/ReArm_C1P02_20210306_V1"
    fname = "ReArm_C1P02_20210306_V1_Reaching/ReArm_C1P02_20210322_V1_r.xdf"
    fnameToTest = os.path.join(dataPath, fname)

    print(check_xdf_date(fnameToTest))

### Checks for cwa files

In [None]:
def check_cwa_date(cwa_header, fullFname, msg=""):
    """
    Check the date of a cwa file
    The date is extracted from the header of the cwa file
    """

    # get the date from the header in the cwa file
    timestamp = cwa_header["loggingEnd"]
    cwaDate = datetime.fromtimestamp(timestamp, tz=None)

    # get the date from the file name
    fname = os.path.basename(fullFname)  # with extension
    fname = os.path.splitext(os.path.basename(fullFname))[0]  # without extension

    tokens = fname.split("_")
    dateFromFname = datetime.strptime(tokens[2], "%Y%m%d")
    # number of days  between the start of the recording and the date in the file name
    delta = cwaDate - dateFromFname
    nbDays = delta.days
    if nbDays != 0:
        if nbDays > 0:
            msg += f"The recording ended {nbDays} days after the date in the file name."
        else:
            msg += (
                f"The recording ended {-nbDays} days before the date in the file name."
            )
    return cwaDate, msg


def check_cwa_sessionId(cwa_header, fullFname, msg=""):
    """
    Check the session ID of a cwa file
    The session ID is extracted from the header of the cwa file
    """

    # get the session ID from the header in the cwa file
    sessionId = cwa_header["sessionId"]
    # make it a string
    sessionId = str(sessionId)

    # get the session ID from the file name without the extension
    fname = os.path.splitext(os.path.basename(fullFname))[0]
    # get the file nam witout the extension
    tokens = fname.split("_")
    sessionIdFromFname = tokens[6]
    if sessionId != sessionIdFromFname:
        msg += f" The session ID in the file name ({sessionIdFromFname}) does not match the session ID in the cwa file ({sessionId})."
    return sessionId, msg


def ckeck_cwa_deviceId(cwa_header, fullFname, msg=""):
    """
    Check the device ID of a cwa file
    The device ID is extracted from the header of the cwa file
    """

    # get the device ID from the header in the cwa file
    deviceId = cwa_header["deviceId"]
    # make it a string
    deviceId = str(deviceId)

    # get the device ID from the file name without the extension
    fname = os.path.splitext(os.path.basename(fullFname))[0]
    # get the file nam witout the extension
    tokens = fname.split("_")
    deviceIdFromFname = tokens[7]
    if deviceId != deviceIdFromFname:
        msg += f" The device ID in the file name ({deviceIdFromFname}) does not match the device ID in the cwa file ({deviceId})."
    return deviceId, msg


def check_cwa_subject_id(cwa_header, fullFname, msg=""):
    # NOTE: do not use as this will raise many errors (humans wrote the metadata)
    """
    Check the subject ID of a cwa file
    The subject ID is extracted from the header of the cwa file
    """

    # get the subject ID from the header in the cwa file
    subjectId = cwa_header["metadata"]["Subject Code"]

    # get the subject ID from the file name without the extension
    fname = os.path.splitext(os.path.basename(fullFname))[0]
    # get the file nam witout the extension
    tokens = fname.split("_")
    subjectIdFromFname = tokens[1]
    if subjectId != subjectIdFromFname:
        msg += f" The subject ID in the file name ({subjectIdFromFname}) does not match the subject ID in the cwa file ({subjectId})."
    return subjectId, msg


def print_cwa_header(header):
    """
    Print the header of a cwa file
    """
    for key, value in header.items():
        print(f"{key}: {value}")


def create_cwa_header_file(fullFname, msg=""):
    """
    Create a txt file with the content of the header next to the cwa file
    """

    cwa_header = read_cwa_header(fullFname)
    headerFname = fullFname.replace(".cwa", "_header.txt")
    with open(headerFname, "w") as f:
        for key, value in cwa_header.items():
            f.write(f"{key}: {value}\n")

    return headerFname


def read_cwa_header(fullFname):
    """
    Open a cwa file and return the header
    """
    from openmovement.load import CwaData

    # load the CWA file (using with statement to ensure file is closed after use)
    with CwaData(
        fullFname, include_gyro=False, include_temperature=True, verbose=False
    ) as cwa_data:
        pass

    return cwa_data.header


def check_cwa_file(fullFname, msg=""):
    """
    Run the checks on the content of a cwa file
    """

    cwa_header = read_cwa_header(fullFname)

    # we only wan the message to be updated if there is an issue
    msg = ""  # we start with nothing to report
    _,  msg = check_cwa_date(cwa_header, fullFname, msg)
    _,  msg = check_cwa_sessionId(cwa_header, fullFname, msg)
    _,  msg = ckeck_cwa_deviceId(cwa_header, fullFname, msg)

    return msg


if doRunTests:
    dataPath = "../dat/ReArm.lnk/ReArm_C1P02/ReArm_C1P02_20210306_V1/ReArm_C1P02_20210306_V1_Accelerometry/"
    fname = "ReArm_C1P02_20210306_V1_ac_np_101_1742991.cwa"
    # fname = "ReArm_C1P02_20210306_V1_ac_p_201_1743302.cwa"
    fnameToTest = os.path.join(dataPath, fname)
    print(check_cwa_file(fnameToTest))
    print("\n - header -")
    print_cwa_header(read_cwa_header(fnameToTest))
    cwa_header_fname = create_cwa_header_file(fnameToTest)
    print(" - - saved in: " + cwa_header_fname)

# Check the content of ALL data files

For all files, check that the date in the name of the file corresponds to that inside the file.

Here, I use structured code, inspired by the previous attempt, but with functions.

In [None]:
import logging
import re


def check_file_content(fullFname):

    # analyse the extension of the file
    fpath = os.path.dirname(fullFname)
    fname = os.path.basename(fullFname)
    fname, extension = os.path.splitext(fname)

    # split the file name into tokens
    tokens = fname.split("_")

    # Initialize a message for this file
    msg = ""

    # if the date is not found, we set it to None
    date = None
    nbDays = 0

    # if "ReArm_C1P02_20210715_V3_c_l_m_np.csv" in fullFname:
    #     print(f"fullFname: {fullFname}")

    if extension == ".csv" and len(tokens) > 4 and tokens[5] in ["k", "l"]:
        date, msg = check_csv_date(fullFname, msg)

    if extension == ".xdf":
        date, msg = check_xdf_date(fullFname, msg)

    if extension == ".cwa":
        cwa_header = read_cwa_header(fullFname)
        # save a readable copy o fthe header next to the cwa file
        cwa_header_fname = create_cwa_header_file(fullFname)
        msg += f" Header saved in: {os.path.basename(cwa_header_fname)}. "
        # check the content of the cwa file
        date, msg = check_cwa_date(cwa_header, fullFname, msg)
        ____, msg = check_cwa_sessionId(cwa_header, fullFname, msg)
        ____, msg = ckeck_cwa_deviceId(cwa_header, fullFname, msg)
    

    if extension in [".oxy3", ".oxy4", ".easy", ".pdf"]:
        date = datetime.strptime(tokens[2], "%Y%m%d")
        msg += "Cannot check date inside {} files. ".format(extension)

    if date is not None:
        date = date.strftime("%Y%m%d")
        # if the date is not good, we send a date message
        if date != tokens[2]:
            msg += f"({tokens[2]}) does not match date inside the file ({date})."

    # finally, we log the message if not empty
    if msg != "":
        log_message(f"{fname}{extension}:", msg)


def log_message(file_name, msg):
    # shift_spaces is for nice output formatting in the log file
    shift_spaces = "                                   "
    message = f"{file_name} \n{shift_spaces}{msg}"
    logging.info(message)


def check_file_content_in_all_files(absVisitPath):
    """
    Check the date of all files in a visit
    """
    # read goodFiles.log
    goodFiles = []
    goodFiles_fname = "goodFiles.log"
    full_goodFiles_fname = os.path.join(absVisitPath, goodFiles_fname)
    with open(full_goodFiles_fname, "r") as f:
        for line in f:
            relativePath = line.strip()
            fullFname = os.path.join(absVisitPath, relativePath)
            goodFiles.append(fullFname)
    # log the number of files to check
    logging.info(f"{len(goodFiles)} files to check found in {goodFiles_fname}. ")
    # check the date of each file
    for file_path in goodFiles:
        check_file_content(file_path)


def check_all_files_content(visitPath):
    absVisitPath = os.path.abspath(visitPath)
    checkLog_fname = "file_content.log"
    full_checkLog_fname = os.path.join(absVisitPath, checkLog_fname)

    # Check if the visit was already checked (log file exists)
    if os.path.isfile(full_checkLog_fname):
        print(
            "    File content verification process already done: '%s' found"
            % checkLog_fname
        )
    else:
        # Create the log file
        logging.basicConfig(
            filename=full_checkLog_fname,
            level=logging.INFO,
            format="%(asctime)s - %(levelname)s - %(message)s",
            force=True,  # remove previous handlers and set the new one
        )

        logging.info("Starting file content check process ")

        check_file_content_in_all_files(absVisitPath)

        logging.info("File content check completed")
        print("    File content check completed: see '%s' for details" % checkLog_fname)


if doRunTests:
    visitDirectory = "../dat/ReArm.lnk/ReArm_C1P02/ReArm_C1P02_20210306_V1"
    visitDirectory = "../dat/ReArm.lnk/ReArm_C1P02/ReArm_C1P02_20210419_V2"
    visitDirectory = "../dat/ReArm.lnk/ReArm_C1P02/ReArm_C1P02_20210715_V3"

    visitDirectory = "../dat/ReArm.lnk/ReArm_C1P07/ReArm_C1P07_20210716_V1"
    visitDirectory = "../dat/ReArm.lnk/ReArm_C1P07/ReArm_C1P07_20210820_V2"
    visitDirectory = "../dat/ReArm.lnk/ReArm_C1P07/ReArm_C1P07_20211116_V3"
    check_all_files_content(visitDirectory)