# Imports

*notes:*
- everything is sequentially made in this file because strymread can only be launched within jupyter notebooks

In [4]:
import asyncio
import json
import subprocess
import numpy as np
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import ast

import strym
from strym import strymread
from strym import strymmap
print(strym.__version__)

0.4.3


# Function definitions

## iRODS command wrappers

In [5]:
def ils():
    '''
    wrapper for iRODS ils command
    :return: list of files and folder in the current folder
    '''
    process_files = subprocess.run(['ils'],
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE,
                                   universal_newlines=True)
    files = process_files.stdout.split(sep='\n')[1:-1]
    return [f.strip() for f in files]


def icd(destination):
    '''
    wrapper for iRODS icd command
    :param destination: destination to which go to
    :return: subprocess output
    '''
    return subprocess.run(['icd', destination],
                          stdout=subprocess.PIPE,
                          stderr=subprocess.PIPE,
                          universal_newlines=True)


def ipwd():
    '''
    wrapper for iRODS ipwd command
    :return: current directory on CyVerse
    '''
    pwd = subprocess.run(['ipwd'],
                          stdout=subprocess.PIPE,
                          stderr=subprocess.PIPE,
                          universal_newlines=True)
    out = pwd.stdout.strip().strip('\n')
    print('pwd output is:', out)
    return out


## Cache handling

In [6]:
async def async_command_shell(command, verbose: bool = False):
    """Run command in subprocess (shell).
    source: https://fredrikaverpil.github.io/2017/06/20/async-and-await-with-subprocesses/
    """
    # Create subprocess
    process = await asyncio.create_subprocess_shell(command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
    # Status
    if verbose:
        print("Started:", command, "(pid = " + str(process.pid) + ")", flush=True)
    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()
    # Output
    if process.returncode == 0:
        if verbose:
            print("Done:", command, "(pid = " + str(process.pid) + ")", flush=True)
        return stdout.decode().strip()
    else:
        if verbose:
            print("Failed:", command, "(pid = " + str(process.pid) + ")", flush=True)
        raise Exception(stderr.decode().strip())


async def iget(file_adress, destination, verbose: bool = False):
    '''
    wrapper for iRODS iget command
    async command using asyncio library
    :param file_adress: address on CyVerse fileshare
    :param destination: address to download to on the local computer
    :return: local address of the file
    '''
    try:
        await async_command_shell(f'iget -T {file_adress} {destination}', verbose=verbose)
        local_address = destination + '/' + file_adress.split('/')[-1]
        return local_address
    except Exception as e:
        raise Exception(f'Error while downloading file at:'
                        f'\n\tremote: {file_adress}'
                        f'\n\tto local address: {destination}`'
                        f'\n\tFailing on {e}')


def init_cache(local_folder):
    '''
    clears the cache if exists and initialise it
    :param local_folder: root folder for the analysis
    :return: temporary cache address
    '''
    if local_folder != '':
        subprocess.run(['cd', local_folder],
                       stdout=subprocess.PIPE,
                       stderr=subprocess.PIPE,
                       universal_newlines=True)
    local_folder_absolute = subprocess.run(['pwd'],
                   stdout=subprocess.PIPE,
                   stderr=subprocess.PIPE,
                   universal_newlines=True).stdout.strip()
    files = subprocess.run(['ls'],
                   stdout=subprocess.PIPE,
                   stderr=subprocess.PIPE,
                   universal_newlines=True)
    files = files.stdout.split(sep='\n')
    if 'temp_cache' in files:
        subprocess.run(['rm', '-r', '-f', 'temp_cache'],
                   stdout=subprocess.PIPE,
                   stderr=subprocess.PIPE,
                   universal_newlines=True)
    subprocess.run(['mkdir', 'temp_cache'],
               stdout=subprocess.PIPE,
               stderr=subprocess.PIPE,
               universal_newlines=True)
    temp_cache_address = f'{local_folder_absolute}/temp_cache'
    return temp_cache_address


## Fileshare exploration

In [7]:
def findall_files(root, verbose: bool = False):
    '''
    finds all files within the root directory and recursively below
    :param root: str, root file from which to begin the search
    :param verbose: bool, set to True to see fuller logs
    :return: List<str>
    '''
    dir_queue = [root]
    files = []

    while len(dir_queue) != 0:
        current_dir = dir_queue.pop()
        icd(current_dir)
        queue = ils()
        if verbose:
            print('---------')
            print('current queue dir: ', dir_queue)
            print('current directory is: ', current_dir)
            print('current file queue is: ', queue)

        for f in queue:
            if verbose:
                print('current file tests on: ', f, ' and test gives f[0:2]: ', f[0:2], ' and f[-4:] is: ', f[-4:])
            # avoid dashcams and bafiles folders, only use the libpanda ones -> reduces the number of files to scan for
            if f[0:2] == 'C-' and 'bagfiles' not in f and 'dashcams' not in f:
                dir_queue.append(f[3:])
                if verbose:
                    print('appending dir queue; ', f)
            elif f[-4:] == '.csv':
                # We also conserve the current folder to get the entire path to the file
                current_folder = ipwd()
                files.append(f'{current_folder}/{f}')
                if verbose:
                    print('appending file; ', f)

        if verbose:
            print('found ', len(files), ' files')

    return files


def can_gps_coupling(files):
    '''
    links the CAN and GPS from same acquisitions
    :param files: array of file adresses
    :return: List<{'can': str, 'gps': str || None}>
    '''
    file_list = []
    for file in files:
        if '_CAN_Messages.csv' in file:
            file_list.append({'can': file, 'gps': None})

    for i in range(len(file_list)):
        file_gps = file_list[i]['can'][0:-17] + '_GPS_Messages.csv'
        if file_gps in files:
            file_list[i]['gps'] = file_gps

    return file_list


## Car crossing detection

In [8]:
def read_metadata(canfile, gpsfile, ignore_gps_file: bool = False, verbose: bool = False):
    """
    :param canfile: string, csv file
    :param gpsfile: string, csv file
    :param ignore_gps_file: set to True to avoid downloading the GPS file
    :return: strymreads of can and gps files, dictionnary of the meta-datas associated
    TODO: find how to extract the metadata here (maybe from strymread)?
      Find also the desired metadata to be logged into the DB
      -> what could be the useful thing to sort on? car model? day? other things?
    """

    # read canfile
    s = strymread(csvfile=canfile)
    if verbose:
        print(f'reading of {canfile} was succesful? {s.success}')

    # read and link gpsfile
    if not ignore_gps_file:
        g = strymmap(csvfile=gpsfile)
        if verbose:
            print(f'reading of {gpsfile} was succesful? {g.success}')
    else:
        g = None
        if verbose:
            print(f'reading of GPS file was ignored')

    # metadata from filename
    date_time = canfile.split('/')[-1][0:19]
    vin = canfile.split('/')[-1][20:37]

    return s, g, {'date_time': date_time, 'vin': vin}


def read_data(can, gps):
    """
    :param can: strymread object
    :param gps: strymmap object
    :return: speed, lead_distance, cruise_control time series
    """
    try:
        speed_ts = can.speed()
        lead_distance_ts = can.lead_distance()
        cruise_control_state_ts = can.acc_state()
        return speed_ts, lead_distance_ts, cruise_control_state_ts
    except Exception as err:
        print(f"Error while trying to read the time series.\nFailed on: {err}")
        raise Exception(err)


def find_ts_state_at_given_time(ts, ts_time, event_time):
    """
    Finds the value of a time series at a given point in time. Uses the closest time point to the event
    :param ts: Time Series messages list to search within
    :param ts_time: Time Series time list
    :param event_time: Time at which we want the value
    :return:
    """
    min_index = np.argmin([np.abs(time - event_time) for time in ts_time])
    return ts[min_index]


def find_crossing(speed, lead_distance, cruise_control_state, speed_treshold = 20,
                  prev_treshold = 10, next_treshold = 5, verbose: bool = False):
    """
    finds the time where car crossing events happens, from ts associated to 1 specific acquisition
    this functions find the acceptable intervals where the constraints on speed and cruise control are valid,
    then finds the places where crossings happens, filtering them by the acceptable times (this allows to handle
    different sampling frequencies over the different time series)

    :param speed: Time Series of the speed
    :param lead_distance: Time Series of the Speed
    :param cruise_control_state: Time Series of the Controller state (=6 if activated)
    :param speed_treshold: minimum speed to consider a dangerous time crossing event, in km/h
    :param prev_treshold: minimum lead distance before the crossing to consider the event as a car crossing
    :param next_treshold: maximum lead distance after the crossing to consider the car crossing as dangerous
    :param verbose: Set to true to get more logs

    :return: array<time>, of event_times of car crossing events

    TODO: refine the cruise control state to also encompass other semi-activated states as if controller on?
    """
    event_times = []
    controller_states = []
    speeds = []
    unacceptable_crossings = []

    lead_distance_list = lead_distance['Message']
    lead_time_list = lead_distance['Time']
    len_lead = len(lead_time_list)

    speed_list = speed['Message']
    speed_time_list = speed['Time']
    len_speed = len(speed_time_list)

    cc_state_list = cruise_control_state['Message']
    cc_state_time_list = cruise_control_state['Time']

    # Acceptable times for cruise control state and speed:
    # composed of time interval objects {"beg": time_begining, "end": time_ending}
    acceptable_range_speed = []
    currently_valid = False
    current_interval = {"beg": None, "end": None}
    for i in range(len_speed):
        if speed_list[i] >= speed_treshold and not currently_valid:
            currently_valid = True
            current_interval['beg'] = speed_time_list[i]
        elif speed_list[i] < speed_treshold and currently_valid:
            currently_valid = False
            current_interval['end'] = speed_time_list[i]
            acceptable_range_speed.append(current_interval)
            current_interval = {"beg": None, "end": None}

    # case if the speed is still acceptable at the end of the file:
    if current_interval['beg'] and not current_interval['end']:
        current_interval['end'] = speed_time_list[-1]
        acceptable_range_speed.append(current_interval)

    for i in range(1, len_lead):
        is_lead_distance_acceptable = (lead_distance_list[i - 1] >= prev_treshold) and (lead_distance_list[i] <= next_treshold)
        if is_lead_distance_acceptable:
            # if at this time a car crossing occurs, we check that the conditions to store this event are valid
            time_event = lead_time_list[i]
            unacceptable_crossings.append(time_event)
            for interval in acceptable_range_speed:
                if interval['beg'] <= time_event <= interval['end']:
                    event_times.append(time_event)
                    controller_states.append(find_ts_state_at_given_time(cc_state_list, cc_state_time_list, time_event))
                    speeds.append(find_ts_state_at_given_time(speed_list, speed_time_list, time_event))

    if verbose:
        print(f'acceptable range for speed > {speed_treshold} m/s: {acceptable_range_speed}')
        print(f'number of crossings detected: {len(unacceptable_crossings)}')
        print(f'number of valid crossings detected: {len(event_times)}')
        print(f'event times of valid crossings: {event_times}')

    return event_times, controller_states, speeds


def plot_events_over_lead(name, lead, times, event_times):
    """
    TODO document this
    TODO use plotly instead of pyplot
    :param lead:
    :param times:
    :param event_times:
    :return:
    """
    # create a fake list of event times to see them on the graph
    def fake(time):
        if time in event_times:
            return 252
        else:
            return 0
    event_times_fake = [fake(time) for time in times]
    # plot the figure
    fig, ax = plt.subplots()
    l = ax.plot(times, lead, 'b.')
    e = ax.plot(times, event_times_fake, 'r-')
    plt.title(name)
    plt.ion()
    plt.show()

## File handler & cache

In [9]:
class FileHandler:
    """
    Class handling download and delete of files to be analyzed
    """
    # attributes
    all_files = None
    coupled_files = None
    local_root_folder = None
    remote_addresses = None
    can_local_address = None
    gps_local_address = None
    index = None
    max_index = None

    # methods
    def __init__(self, local_root_folder, start_index: int = 0):
        """
        :param local_root_folder: Local root for the download folder
        """
        self.local_root_folder = local_root_folder
        self.index = start_index
        print('File Handler ready for file exploration')

    def explore(self, analyze: bool = True, root: str = '', exploration_name = None,
                 previous_exploration_path = None, verbose: bool = False):
        """
        Initialises the path objects, then file handler attributes

        :param analyze: True if you want to explore files from CyVerse,
        False if you want to use a file giving the coupled files from a previous FileShare exploration
        :param root: root of the search for exploring on CyVerse
        :param exploration_name: name for the coupled file local copy
        :param previous_exploration_path: local address towards the file giving the coupled files from a previous
        FileShare exploration

        TODO: include a call to an iinit irods function
        """
        # case of file share exploration
        if analyze:
            try:
                self.all_files = findall_files(root, verbose)
                self.coupled_files = can_gps_coupling(self.all_files)
                self.max_index = len(self.coupled_files)
                # save the csv file
                output_filename = coupled_files_file_namer(exploration_name, root)
                df = pd.DataFrame(data={'Files': self.coupled_files})
                df.to_csv(path_or_buf=f'results/{output_filename}')
                if verbose:
                    print('exploration logged as: ', output_filename)
            except Exception as e:
                print(f'CyVerse FileShare exploration failed on: {e}')

        # case of using a file to get the coupled addresses
        else:
            try:
                df = pd.read_csv(previous_exploration_path)
                self.coupled_files = df['Files']
                self.max_index = len(self.coupled_files)
            except Exception as e:
                print(f'retrieving from file at {previous_exploration_path} failed on: {e}')


    def __str__(self):
        if self.max_index is None:
            return f'FileShare exploration is not finished'
        else:
            return f'file handler with {self.max_index} couples, current index is: {self.index}'


    async def next(self, ignore_gps_file: bool = False):
        """
        clears cache & downloads the next couple of files
        :param: ignore_gps_file: set to True to avoid downloading the GPS file
        :return: - object with paths to the downloaded CAN and GPS file
        {'can': str, 'gps': str, 'remote_addresses': {'can': str, 'gps': str}}
                 - if the maximum index is reached, returns an exception as:
        Exception('max_index')
        """
        try:
            if self.index < self.max_index:
                cache = init_cache(self.local_root_folder)
                next_file = self.coupled_files[self.index]
                if type(next_file) == type('string'):
                    self.remote_addresses = ast.literal_eval(next_file)
                else:
                    self.remote_addresses = self.coupled_files[self.index]

                self.can_local_address = await iget(self.remote_addresses['can'], cache)
                if ignore_gps_file:
                    self.gps_local_address = None
                else:
                    self.gps_local_address = await iget(self.remote_addresses['gps'], cache)

                self.index += 1

                return {
                    'can': self.can_local_address,
                    'gps': self.gps_local_address,
                    'remote_addresses': self.remote_addresses
                }
            else:
                raise Exception('max_index')

        except Exception as e:
            raise Exception(f'Downloading next file failed on {e}')

    def clear(self):
        init_cache(self.local_root_folder)
        print('Cache cleared')

def coupled_files_file_namer(name, root):
    return f'file_exploration&{name}&create_on={str(datetime.now()).replace(" ", "_")}&root={root.replace("/", "_")}.csv'


## Multi-file analysis

In [None]:
def csv_file_namer(name, speed_t, prev_t, next_t):
    return f'{name}&create_on={datetime.now()}&s={speed_t}&p={prev_t}&n={next_t}.csv'


def create_results_folder():
    """Wrapper for shell command "mkdir results". No return from this function."""
    subprocess.run(['mkdir', 'results'],
                   stdout=subprocess.PIPE,
                   stderr=subprocess.PIPE,
                   universal_newlines=True)


def analyse_events(canfile, gpsfile, prev_treshold, next_treshold, speed_treshold,
                   verbose: bool = False, plot: bool = False, plot_name: str = ''):
    """
    From a CAN and a GPS file as well as analysis parameters, finds the useful information (all of the events tiles,
    as well as metadata about the acquisition and at event times)
    :param canfile: local path to the CSV of the CAN acquisition 
    :param gpsfile: local path to the GPS of the CAN acquisition
    :param prev_treshold: previous speed treshold for car crossing
    :param next_treshold: next speed threshold for car crossing
    :param speed_treshold: minimum speed to consider a car crossing
    :param verbose: set to True to have more extensive logs
    :param plot: set to True to plot the lead distance as well as the event times
    :param plot_name: name to give to the plot
    :return: array of:
        - the event times
        - cc states and speeds at time of the events
        - metadata about the acquisition
    all of those arrays reference the same event for the same indice 
    """
    ignore_gps_file = gpsfile is None
    s, g, metadata = read_metadata(canfile, gpsfile, ignore_gps_file=ignore_gps_file)
    speed, lead_dist, cc_state = read_data(s, g)
    event_times, event_cc_states, event_speeds = find_crossing(speed, lead_dist, cc_state,
                                                           prev_treshold=prev_treshold,
                                                           next_treshold=next_treshold,
                                                           speed_treshold=speed_treshold,
                                                           verbose=verbose)
    if plot:
        strymread.plt_ts(lead_dist)
        plot_events_over_lead(plot_name, lead_dist['Message'], lead_dist['Time'], event_times)
    return event_times, event_cc_states, event_speeds, metadata


async def explore_and_analyse_bdd(config_path, local_folder: str = '', start_index: int = 0):
    """
    Launches the creation of the exploration and final analysis CSV (fake SQL databases)

    :param config_path: path to a JSON giving the analysis' configuration
    the format is as follows:
    {
        verbose: <bool>, set to True to have extensive logs
        exploration:
                    {
                        remote_root: <str||None>, root of the fileshare exploration on CyVerse. Set to None
                                    if you want to use the CSV file of a previous analysis
                        db_exploration_name: <str||None>, small description of this exploration (an automated
                                            logging of useful information as the date and the parameters is
                                            already implemented). Set to None if you want to use the CSV file of a
                                            previous analysis
                        use_previous_exploration: <str||None>, if None, a new exploration is made using the
                                                  parameters below. If a value is given, put the absolute location
                                                  of the path for using a previous file exploration
                    }
        analysis:
                    {
                        db_analysis_name: <str>, small description of the full analysis. (automated
                                        information are added)
                        car_crossing_parameters:
                            {
                                enable: <bool>, set to True to look for this kind of events
                                speed_threshold: <float>, minimum speed, in m/s
                                previous_distance_threshold: <float>, minimum distance before the crossing
                                next_distance_threshold: <float>, maximum distance after the crossing
                                                        note: the crossing is detected as a discontinuity in
                                                        lead_distance time series
                            }
                    }
    }
    note: the JSON object is intended to hold different analysis parameters, for different situations than car crossings
    :return: finishes the whole exploration
    """
    # Configuration reading
    verbose = False
    try:
        print('Opening configuration file')
        f = open(config_path)
        config = json.load(f)
        print('Configuration opened successfully')
        if config['verbose']:
            verbose = True
            print('verbose is set to True, extensive logs will be displayed')
        else:
            print('verbose is set to False, logs will be scarce')
    except Exception as e:
        raise Exception(f'There was an issue trying to open the configuration file. \nIt failed on: {e}')

    # Exploration of the file share
    if verbose:
        print('opening the file handler')
    file_handler = FileHandler(local_root_folder=local_folder, start_index=start_index)
    if 'use_previous_exploration' in config['exploration'].keys() and config['exploration']['use_previous_exploration']:
        
        if verbose:
            print(f'Using the previous exploration, located at {config["exploration"]["use_previous_exploration"]}')
        file_handler.explore(analyze=False,
                             previous_exploration_path=config['exploration']['use_previous_exploration'],
                             verbose=verbose)
        if verbose:
            print('Previous exploration has been red successfully')
    else:
        if verbose:
            print('starting to explore...')
        file_handler.explore(analyze=True,
                             root=config['exploration']['remote_root'],
                             exploration_name=config['exploration']['db_exploration_name'],
                             verbose=verbose)
        if verbose:
            print(f'...exploration finished. Find it at ./results/{config["exploration"]["db_exploration_name"]}')

    # Analysis of the events
    full_analysis_csv_filename = csv_file_namer(config['analysis']['db_analysis_name'],
                                                config['analysis']['car_crossing_parameters']['speed_threshold'],
                                                config['analysis']['car_crossing_parameters']['previous_distance_threshold'],
                                                config['analysis']['car_crossing_parameters']['next_distance_threshold'])

    output_data = {'remote_addresses': [], 'event_time': [], 'event_speeds': [], 'event_cc_state': [], 'event_type': [], 'date_time': [], 'vin': []}
    if verbose:
        print('Starting the analysis of the events in the explored files')
    for index in range(file_handler.index, file_handler.max_index):
        print('\nINDEX is ', index, ' out of ', file_handler.max_index)
        try:
            # download files
            current_files = await file_handler.next(ignore_gps_file=True)
            if config['verbose']:
                print('starting analysis of the file: ', current_files['can'])
            try:
                # event analysis
                event_times, event_cc_states, event_speeds, metadata = analyse_events(current_files['can'], current_files['gps'],
                                                       prev_treshold=config['analysis']['car_crossing_parameters']['previous_distance_threshold'],
                                                       next_treshold=config['analysis']['car_crossing_parameters']['next_distance_threshold'],
                                                       speed_treshold=config['analysis']['car_crossing_parameters']['speed_threshold'],
                                                       verbose=config['verbose'])
                # add the metadata to the events
                event_type = 'car_crossing'
                number_of_events = len(event_times)
                if number_of_events > 0:
                    output_data['event_time'].extend(event_times)
                    output_data['event_speeds'].extend(event_speeds)
                    output_data['event_cc_state'].extend(event_cc_states)
                    output_data['remote_addresses'].extend([current_files['remote_addresses']] * number_of_events)
                    output_data['event_type'].extend([event_type] * number_of_events)
                    output_data['date_time'].extend([metadata['date_time']] * number_of_events)
                    output_data['vin'].extend([metadata['vin']] * number_of_events)

            except Exception as err:
                print(f'there was an issue trying to scan {current_files["can"]}. \nFailed on: {err}')
        except Exception as err:
            print(f'there was an issue trying to download file at index {index}. \nFailed on: {err}')

    # CSV logging
    if config['verbose']:
        print(f'starting to write the data to the CSV')
    try:
        df = pd.DataFrame(data=output_data)

        df.to_csv(path_or_buf=f'results/analysis&{full_analysis_csv_filename}')
        file_handler.clear()
        out_message = f'analysis finished, find it at: {full_analysis_csv_filename}'
        if config['verbose']:
            print(out_message)
        return out_message
    except Exception as e:
        error_msg = f'Error trying to write the CSV file. \nFailed on : {e}'
        print(df)
        print(error_msg)
        return Exception('error_msg')
    



# TESTS

In [None]:
# full analysis, to do remotely on CyVerse
config_path_full = './config/full_analysis_config_after_exploration.json'
await explore_and_analyse_bdd(config_path_full, '', start_index = 30)

# How to perform a REMOTE LAUNCH on CyVerse

1. Go on Discovery environment, launch a Jupyter-strym application

2. Grab the repo via git clone in the terminal

3. Log in to Cyverse

to perform iinit command, start by log out of the current irods user with:
```
     ichmod -M
```

Then perform iinit normally, with:
```
     Enter the host name (DNS) of the server to connect to: data.cyverse.org
     Enter the port number: 1247
     Enter your irods user name: <your_cyverse_user_name>
     Enter your irods zone: iplant
     Enter your current iRODS password: <your_cyverse_password>
```

4. Create the JSON configuration file
5. Launch the analysis