In [None]:
import datetime
import hashlib
import json
import multiprocessing 
import matplotlib.pyplot as plt
import os
import pandas as pd
import psutil
import shutil
import subprocess
import time

from dask.distributed import Client, LocalCluster
from datetime import timedelta
from dotenv import load_dotenv
from os.path import exists as file_exists

from yuntu.collection.methods import collection
from yuntu.soundscape.hashers.crono import DEFAULT_HASHER_CONFIG
from yuntu.soundscape.processors.indices.direct import ICOMPLEXITY, TAIL
from yuntu.soundscape.pipelines.build_soundscape import CronoSoundscape, HASHER_CONFIG

In [None]:
# alfresco query
CUMULO = 92 # INT
SAMPLERATE = 48000.0
PAGESIZE = 1000

# soundscape pltos
RED_IDX = "EXAG"
GREEN_IDX = "INFORMATION"
BLUE_IDX = "CORE"
MIN_FREQ_SC = 10000

# soundscape computing
WORK_DIR_PIPELINE = "."
TIME_UNIT = 30
FREQUENCY_BINS = 96 # 250 Hz x bin
FREQUENCY_LIMITS_LB = 0
FREQUENCY_LIMITS_UB = 24000
SPECTRUM = "Audible"
# Hasher 
HASHER_TIME_UNIT =  1800
HASHER_TIME_MODULE = 48
HASH_NAME = "crono_hash_30m"

## cluster
THREADS_PER_WORKER = 2

# results directory
RESULTS_DIR = '/shared_volume/audio/soundscapes'

In [None]:
def create_results_folder_str(results_dir, cumulo, nodes_list, rec_list, dep_list): 
    # results directory
    os.makedirs(results_dir, exist_ok=True)
    # cumulus subdir
    cum_subdir = os.path.join(results_dir, str(cumulo))
    os.makedirs(cum_subdir, exist_ok=True)
    # node subdirs
    for node in nodes_list:
        node_subdir = os.path.join(cum_subdir, node)
        os.makedirs(node_subdir, exist_ok=True)
        # recorder subdirs
        for rec in rec_list:
            rec_subdir = os.path.join(node_subdir, rec)
            os.makedirs(rec_subdir, exist_ok=True)
            # deployment subdirs
            for dep in dep_list:
                dep_subdir = os.path.join(rec_subdir, dep)
                os.makedirs(dep_subdir, exist_ok=True)
                
def remove_empty_folders(path_abs):
    walk = list(os.walk(path_abs))
    for path, _, _ in walk[::-1]:
        if len(os.listdir(path)) == 0:
            os.rmdir(path)            
            
def save_metadata(product_id, product_type, product_spectrum, sc_config,
                  path, cumulus, node, recorder, deployment, parent="Null"):
    if product_type == "soundscape":
        product_name = "Soundscape"
        file_path = os.path.join(path, "hashed_soundscape.parquet")
        metadata_filename = os.path.join(path, "soundscape_metadata.json")
    elif product_type == "sequence":
        product_name = "Soundscape sequential plot"
        file_path = os.path.join(path, "soundscape_seq.png")
        metadata_filename = os.path.join(path, "soundscape_seq_metadata.json")
    elif product_type == "standard_deviation":
        product_name = "Soundscape standard deviation plot"
        file_path = os.path.join(path, "std_soundscape.png")
        metadata_filename = os.path.join(path, "std_soundscape_metadata.json")
    elif product_type == "mean":
        product_name = "Soundscape mean plot"
        file_path = os.path.join(path, "mean_soundscape.png")
        metadata_filename = os.path.join(path, "mean_soundscape_metadata.json")
    
    if int(node.split("_")[2]) == 0:
        node_category = "Degradado"
    elif int(node.split("_")[2]) == 1:
        node_category = "Integro"

    metadata = {
        "product_id": product_id,
        "product_parent": parent,
        "product_name": product_name,
        "product_configs": sc_config,
        "product_path": file_path,
        "product_spectrum": product_spectrum,
        "CumulusName": cumulus,
        "NodeCategoryIntegrity": node_category,
        "NomenclatureNode": node,
        "SerialNumber": recorder,
        "DateDeployment": deployment
    }
    
    with open(metadata_filename, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, ensure_ascii=False, indent=4)

def plot_soundscape(soundscape, product_type, product_spectrum, sc_config, path, 
                    cumulus, node, recorder, deployment, parent, indices, min_freq=None,
                  figsize=(20,15), plt_style='ggplot'):
    
    if min_freq:
        soundscape = soundscape[soundscape['min_freq']<=min_freq]
        
    if product_type == "sequence":
        file_path = os.path.join(path, "sequence.png")
        product_id = hashlib.md5(file_path.encode('utf-8')).hexdigest()
        
        plt.style.use(plt_style)
        fig, ax = plt.subplots(figsize=figsize)
        soundscape.sndscape.plot_sequence(rgb=indices, time_format='%Y-%m %H:%M', ax=ax)
        plt.xticks(rotation = 90)
        ax.grid(False)
        plt.tight_layout()
        plt.savefig(file_path) 
        plt.show()
        # save metadata
        save_metadata(product_id, product_type, product_spectrum, sc_config,
                  path, cumulus, node, recorder, deployment, parent=parent)
         
    elif product_type == "standard_deviation":
        file_path = os.path.join(path, "std_soundscape.png")
        product_id = hashlib.md5(file_path.encode('utf-8')).hexdigest()
        
        plt.style.use(plt_style)
        fig, ax = plt.subplots(figsize=figsize)
        soundscape.sndscape.plot_cycle(rgb=indices, aggr="std", time_format='%H:%M', 
                                       xticks=24, ax=ax)
        plt.xticks(rotation = 90)
        ax.grid(False)
        plt.tight_layout() 
        plt.savefig(file_path)
        plt.show()
        
        # save metadata
        save_metadata(product_id, product_type, product_spectrum, sc_config,
                  path, cumulus, node, recorder, deployment, parent)     
        
    elif product_type == "mean": 
        file_path = os.path.join(path, "mean_soundscape.png")
        product_id = hashlib.md5(file_path.encode('utf-8')).hexdigest()
        
        plt.style.use(plt_style)
        fig, ax = plt.subplots(figsize=figsize)
        soundscape.sndscape.plot_cycle(rgb=indices, aggr="mean", time_format='%H:%M', 
                                       xticks=24, ax=ax)
        plt.xticks(rotation = 90)
        ax.grid(False)
        plt.tight_layout()
        plt.savefig(file_path)
        plt.show()
        
        # save metadata
        save_metadata(product_id, product_type, product_spectrum, sc_config,
                  path, cumulus, node, recorder, deployment, parent)    
        
    print(f"File saved at {file_path}")

In [None]:
load_dotenv()
DB_CONFIG = {
    'provider': 'alfresco',
    'config': {
        'api_url': 'https://api.conabio.gob.mx',
        'page_size': PAGESIZE,
        'api_key': os.getenv("X_API_KEY"),
        'base_filter': "+TYPE: \"sipecamAudio:audiofileSipecam\"",
        'recording_parser': {"path": "/shared_volume/audio/utils.py",
                             "object_name": "parser_prev"}
    }
}

COL_CONFIG = {
    "col_type": "alfresco",
    "db_config": DB_CONFIG
}

col = collection(**COL_CONFIG)
query = f"(sipecam:CumulusName:\"{CUMULO}\") AND (sipecamAudio:SampleRate:{SAMPLERATE})"
recs = col.get_recording_dataframe(with_metadata = True, with_geometry = False)

# include filtering columns for processing units
recs.loc[:, "node"] = recs.metadata.apply(lambda x: x["entry"]["properties"]["sipecam:NomenclatureNode"])
recs.loc[:, "recorder"] = recs.metadata.apply(lambda x: x["entry"]["properties"]["sipecamAudio:SerialNumber"]) 
recs.loc[:, "deployment"] = recs.metadata.apply(lambda x: x["entry"]["path"]["name"].split("/audio")[0].split("/")[-1])
recs.loc[:,"proc_unit"] = recs.apply(lambda x: (x["node"], x["recorder"], x["deployment"]), axis=1)

In [None]:
# create results folder structure
nodes_list = recs.node.unique()
recorders_list = recs.recorder.unique()
deployments_list = recs.deployment.unique()
if os.path.isdir(RESULTS_DIR):
    shutil.rmtree(RESULTS_DIR)
create_results_folder_str(RESULTS_DIR, CUMULO, nodes_list, recorders_list, deployments_list)

In [None]:
execution_info = {}
start_time_compute_soundscapes = time.monotonic()

# hasher config 
hasher_config = {'module': {'object_name': 'yuntu.soundscape.hashers.crono.CronoHasher'},
                 'kwargs': {'time_utc_column': 'abs_start_time'}}

hasher_config["kwargs"]["time_unit"] = HASHER_TIME_UNIT
hasher_config["kwargs"]["time_module"] = HASHER_TIME_MODULE
hasher_config["kwargs"]["start_tzone"] = "America/Mexico_City"
hasher_config["kwargs"]["start_time"] = DEFAULT_HASHER_CONFIG["start_time"]
hasher_config["kwargs"]["start_format"] = DEFAULT_HASHER_CONFIG["start_format"]
hasher_config["kwargs"]["aware_start"] = None

# soundscape config 
slice_config  = dict(CronoSoundscape()["slice_config"].data)
slice_config["time_unit"] = TIME_UNIT
slice_config["frequency_bins"] = FREQUENCY_BINS
slice_config["frequency_limits"] = (FREQUENCY_LIMITS_LB, FREQUENCY_LIMITS_UB)

# FED configuration ["TOTAL", "CORE", "TAIL", "INFORMATION", "ICOMPLEXITY", "EXAG"]
indices = CronoSoundscape()["indices"].data + [ICOMPLEXITY()]  + [TAIL()]

# dask local cluster
n_workers = int(0.95 * multiprocessing .cpu_count()) 
cluster = LocalCluster(n_workers = n_workers, 
                       threads_per_worker = THREADS_PER_WORKER)
client = Client(cluster)
npartitions = len(client.ncores())

# FEED
FEED = {
    "slice_config": slice_config,
    "indices": indices,
    "hash_name": HASH_NAME,
    "hasher_config": hasher_config,
    "npartitions": npartitions
}

# adjust for metadata
indexes_computed = ["TOTAL", "CORE", "TAIL", "INFORMATION", "ICOMPLEXITY", "EXAG"]
FEED_metadata = FEED.copy()
FEED_metadata["indices"] = indexes_computed

plot_indices = [RED_IDX, GREEN_IDX, BLUE_IDX] # rgb order

# soundscape per unit (cumulus-node-recorder-deployment_date)
proc_units = recs.proc_unit.unique()

for proc_unit in proc_units:
    try: 
        start_soundscape = time.monotonic()
        node, recorder, deployment = proc_unit
        print(f"* Processing: node {node} | recorder {recorder} | deployment date {deployment}")
        file_path = os.path.join(RESULTS_DIR, str(CUMULO), str(node), recorder, deployment)
        parent_id = hashlib.md5(file_path.encode('utf-8')).hexdigest()
        # soundscape = recs[recs.proc_unit == proc_unit].audio.get_soundscape(client=client, npartitions=n_workers, **soundscape_config)
        soundscape_data = recs[recs.proc_unit == proc_unit]
        pipeline = CronoSoundscape(name = "soundscape", work_dir = WORK_DIR_PIPELINE, recordings = soundscape_data)
        soundscape = pipeline["hashed_soundscape"].compute(client=client, feed=FEED)

        # sequence
        plot_soundscape(soundscape, "sequence", SPECTRUM, FEED_metadata, file_path,
                        CUMULO, node, recorder, deployment, parent_id, plot_indices, MIN_FREQ_SC)    
        # mean
        plot_soundscape(soundscape, "mean", SPECTRUM, FEED_metadata, file_path, 
                        CUMULO, node, recorder, deployment, parent_id, plot_indices, MIN_FREQ_SC)

        # standard deviation
        plot_soundscape(soundscape, "standard_deviation", SPECTRUM, FEED_metadata, file_path, 
                        CUMULO, node, recorder, deployment, parent_id, plot_indices, MIN_FREQ_SC)

        # save soundscape vector
        soundscape_path = os.path.join(file_path, "hashed_soundscape.parquet")
        # soundscape_orig_path = os.path.join(RESULTS_DIR, "get_soundscape/persist/hashed_soundscape.parquet") 
        soundscape_orig_path = '/shared_volume/audio/soundscape/persist/hashed_soundscape.parquet'
        shutil.move(soundscape_orig_path,soundscape_path)
        save_metadata(parent_id, "soundscape", SPECTRUM, FEED_metadata, file_path,
                      CUMULO, node, recorder, deployment)
        shutil.rmtree('/shared_volume/audio/soundscape')

    except:
        pass
    # restart client
    client.restart()

# total time (soundscapes)
execution_info["time_compute_soundscapes"] = str(timedelta(seconds=time.monotonic() - start_time_compute_soundscapes))
   
client.close()
cluster.close()

# remove empty subdirectories
remove_empty_folders(RESULTS_DIR)

# execution info
# arch info
arch_info_dict = {}
arch_info = subprocess.check_output("lscpu", shell=True).strip().decode().split("\n")[:-1]
arch_info = [x.replace(" ", "") for x in arch_info]

for field in arch_info:
    key, value = field.split(":")
    arch_info_dict[key] = value
arch_info_dict["RAM Memory (GB)"] = psutil.virtual_memory().total >> 30

execution_info["arch_info_dict"] = arch_info_dict

execution_path = os.path.join(RESULTS_DIR, "execution_info.json")

if os.path.exists(execution_path):
    os.remove(execution_path)
with open(execution_path, 'w', encoding='utf-8') as f:
    json.dump(execution_info, f, ensure_ascii=False, indent=4)    