In [1]:
import time
import logging
import traceback
import watchdog
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
import multiprocessing as mp
import numpy as np
from sklearn.cluster import KMeans
import re
import os
from os.path import join, exists
from enum import Enum, auto

import wolff
import wolff_cross

In [9]:
path = '/Users/s3182541/STSP/Decoding/data/final/exp2'
device_i = 0

def load_file(file, mmap=None):
    while True:
        try:
            arr = np.load(file, mmap_mode=mmap)
            break
        except (OSError, ValueError) as e:
            print(str(e))
            print("Error reading file, trying again...")
            time.sleep(1)
            continue
    
    return arr

def decode_part(dat_range, path, module, path_sigma, save_file, device_i):
    start_t = time.time()
    
    if not exists(save_file):
        print("Loading data...")
        data = load_file(join(path, "data" + module + ".npy"), 'r')
        data = data[dat_range].copy()
        print("Loading angles...")
        angles = load_file(join(path, "angles.npy"), 'r')
        angles = angles[dat_range].copy()
        print("Loading sigma...")
        sigma = load_file(path_sigma)
        print("All files loaded")
        
        bin_width = np.pi / 6

        print("Decoding...")
        cross_cos_amp = wolff_cross.cross_decode(data, angles, bin_width, sigma, device_i)
#         cross_cos_amp = wolff_cross.cross_decode(data, angles, bin_width, sigma, 2)

        c = np.mean(cross_cos_amp, 0)

        np.save(save_file, c)
    
    print("Done with " + save_file)
    end_t = time.time()
    
    with open(join(path, "diagnostics.txt"), 'a') as f:
        f.write(str(end_t - start_t) + "\n")

def calc_sigmas(len_i, split_i, path, module):
    data = load_file(join(path, "data" + module + ".npy"), 'r')
    data0 = data[range(split_i)].copy()
    data1 = data[range(split_i, len_i)].copy()
    
    path0 = join(path, "sigma" + module + "_0.npy")
    path1 = join(path, "sigma" + module + "_1.npy")
    
    if not exists(path0):
        sigma0 = wolff_cross.prepare_sigma(data0)
        np.save(path0, sigma0)
    
    if not exists(path1):
        sigma1 = wolff_cross.prepare_sigma(data1)
        np.save(path1, sigma1)
    
    return (path0, path1)
    
def err_handler(e):
    print(str(e))
    traceback.print_tb(e.__traceback__)
    
def decode_file(pool, path, module):
    save_file = "c" + module
    save_file = join(path, save_file)
    
    print("Starting with " + path + ", module " + module)

    # Split up all data evenly
    angles = load_file(join(path, "angles.npy"), 'r') # We have to load this to check its length
    len_i = len(angles)
    split_i = int(len_i / 2)
    
    path_sigma0, path_sigma1 = calc_sigmas(len_i, split_i, path, module)
    
    global device_i

    pool.apply_async(decode_part, 
                     (range(split_i), path, module, path_sigma0, save_file+"_0.npy", device_i), 
                     error_callback=err_handler)
    device_i = (device_i + 1) % 3
    
    pool.apply_async(decode_part, 
                     (range(split_i, len_i), path, module, path_sigma1, save_file+"_1.npy", device_i), 
                     error_callback=err_handler)
    device_i = (device_i + 1) % 3
    
def get_present_files():
    files = []
    pat = re.compile(r"(data)|(sigma)|(angles)")
    
    for (dirpath, dirnames, filenames) in os.walk(path):
        files += ([join(dirpath, f) for f in filenames if pat.search(f)])
        
    return files

class PrepDataHandler(watchdog.events.FileSystemEventHandler):
    def __init__(self):
        super(PrepDataHandler, self).__init__()
        self.files = []
    
    def on_created(self, event):
        if not event.is_directory:
            self.files.append(event.src_path)

if __name__ == "__main__":
    processed_files = set()
    new_files = get_present_files()
    
    event_handler = PrepDataHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        with mp.Pool(3) as pool:
            while True:
                # if there is a new file, get its name, check whether it's a data file
                # and check whether the angles file is present; then you can begin decoding
                time.sleep(1)

                new_files = new_files + [path for path in event_handler.files if path not in processed_files]
                
                if new_files:
                    print("new_files:", new_files)
                    print("processed_files:", processed_files)
                    
                    for file in new_files:
                        f_dir, f_name = os.path.split(file)
                        
                        if re.search(r"angles.npy", f_name):
                            if join(f_dir, "data1.npy") in processed_files \
                              and join(f_dir, "sigma1.npy") in processed_files:
                                decode_file(pool, f_dir, "1")
                                
                            if join(f_dir, "data2.npy") in processed_files \
                              and join(f_dir, "sigma2.npy") in processed_files:
                                decode_file(pool, f_dir, "2")
                        
                        if re.search(r"data", f_name) or re.search(r"sigma", f_name):
                            module = re.sub(r"(data)|(sigma)|(\.npy)", "", f_name)
                            dat_file = "data" + module + ".npy"
                            sig_file = "sigma" + module + ".npy"
                            
                            deps = join(f_dir, "angles.npy") in processed_files
                            dat_dep = f_name == dat_file or join(f_dir, dat_file) in processed_files
                            sig_dep = f_name == sig_file or join(f_dir, sig_file) in processed_files
                            deps = deps and dat_dep and sig_dep
                            
                            if deps:
                                decode_file(pool, f_dir, module)
                            
                        processed_files.add(file)
                
                new_files = []
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

new_files: ['/Users/s3182541/STSP/Decoding/data/final/exp2/1/sigma1_1.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/1/data1.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/1/sigma1.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/1/sigma1_0.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/1/angles.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/1/data2.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/2/data1.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/2/angles.npy', '/Users/s3182541/STSP/Decoding/data/final/exp2/2/data2.npy']
processed_files: set()
Starting with /Users/s3182541/STSP/Decoding/data/final/exp2/1, module 1
Loading data...
Loading data...
Loading angles...
Loading sigma...
Loading angles...
Loading sigma...
All files loaded
Decoding...
(864, 17, 2300)
All files loaded
Decoding...
(864, 17, 2300)
[LogicalDevice(name='/device:GPU:0', device_type='GPU')]
[LogicalDevice(name='/device:GPU:0', device_type='GPU')]
Trial 1/864
Trial 1/864


Process ForkPoolWorker-5:
Process ForkPoolWorker-4:
Traceback (most recent call last):


in converted code:

    /Users/s3182541/STSP/Decoding/wolff_cross.py:100 parallel  *
        return tf.map_fn(calc_dist_part,
    /home/s3182541/.conda/envs/stsp/lib/python3.6/site-packages/tensorflow_core/python/ops/map_fn.py:275 map_fn
        elem.get_shape().with_rank_at_least(1)[0])))
    /home/s3182541/.conda/envs/stsp/lib/python3.6/site-packages/tensorflow_core/python/framework/tensor_shape.py:309 merge_with
        self.assert_is_compatible_with(other)
    /home/s3182541/.conda/envs/stsp/lib/python3.6/site-packages/tensorflow_core/python/framework/tensor_shape.py:276 assert_is_compatible_with
        (self, other))

    ValueError: Dimensions 2300 and 600 are not compatible

in converted code:

    /Users/s3182541/STSP/Decoding/wolff_cross.py:100 parallel  *
        return tf.map_fn(calc_dist_part,
    /home/s3182541/.conda/envs/stsp/lib/python3.6/site-packages/tensorflow_core/python/ops/map_fn.py:275 map_fn
        elem.get_shape().with_rank_at_least(1)[0])))
    /home/s318254

In [3]:
with open(join('/Users/s3182541/temp', "diagnostics.txt"), 'a') as f:
    f.write("hi" + "\n")

In [23]:
get_present_files()

['/Users/s3182541/STSP/Decoding/data/final/exp1/2/angles.npy',
 '/Users/s3182541/STSP/Decoding/data/final/exp1/2/sigma2.npy',
 '/Users/s3182541/STSP/Decoding/data/final/exp1/2/data1.npy',
 '/Users/s3182541/STSP/Decoding/data/final/exp1/1/sigma2.npy',
 '/Users/s3182541/STSP/Decoding/data/final/exp1/1/angles.npy',
 '/Users/s3182541/STSP/Decoding/data/final/exp1/1/data1.npy']

In [25]:
arr = np.array([[1], [2], [3], [4]])
arr[range(2, 4)]

array([[3],
       [4]])

In [4]:
mylist = ['/Users/share/Chiel4Loran/Decoding/exp2/subj_1_initial_angles_3.npy', 
          '/Users/share/Chiel4Loran/Decoding/exp2/subj_1_initial_angles_2.npy']
sorted(mylist)

['/Users/share/Chiel4Loran/Decoding/exp2/subj_1_initial_angles_2.npy',
 '/Users/share/Chiel4Loran/Decoding/exp2/subj_1_initial_angles_3.npy']

In [7]:
try:
    raise ValueError("Some error")
except ValueError as e:
    traceback.print_tb(e.__traceback__)

  File "<ipython-input-7-acedb7a363e3>", line 2, in <module>
    raise ValueError("Some error")


In [8]:
np.load('/Users/s3182541/STSP/Decoding/data/final/exp2/1/data1.npy').shape

(1728, 17, 2300)