In [4]:
base_dir = 'project_sonoramente' #@param {type: "string"}
nome_cartella = "test3" #@param {type: "string"}

# Setup


In [5]:
#@title ▶️ Base dir setup
import os, sys
import json
import numpy as np
import matplotlib.pyplot as plt
import cv2
from typing import List
import plotly.graph_objects as go


# check if hosted (Google VM) or running on local server
if 'google.colab' in sys.modules:
  from google.colab import drive
  drive.mount('/content/drive', force_remount=False)
  base_dir  = os.path.join('/content/drive/MyDrive/', base_dir)

# dirs
data_dir = os.path.join(base_dir, 'data')
# move to base_dir
os.chdir(data_dir)
print("Current dir:", os.getcwd())

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Current dir: /content/drive/MyDrive/project_sonoramente/data


In [6]:
#@title Funzioni base
def show_data(data):
  """
  mostra in formato testuale il contenuto dei file
  """
  for chiave, valore in data.items():
    print(chiave, valore)



def get_num_samples(data):
  """
  Restituisce il numero di campioni nel file
  """
  num_samples = -1

  #controlla il numero di campioni
  for id, value in data.items():
    for bodypart, motion_values in value["motion_data"].items():
      if num_samples == -1:
        num_samples = len(motion_values)
      else:
        if num_samples != len(motion_values):
          print("errore nel numero di campioni")
  return num_samples

def get_person_id(label, data):
  labels = []
  for id, value in data.items():
    if "matching" in value:
      matching, _ = value["matching"]
      if label == matching:
        labels.append(id)

  if len(labels) == 1:
    return labels[0]
  else:
    return None

def plot_dict_arrays_base(dictionary, num_samples, samp_freq, y_lim: tuple = (-1, 100), zoom: tuple = None):
  """
  Mostra i dati in un grafico in relazione al tempo
  """
  fig, ax = plt.subplots(figsize=(20, 6))

  time = [(i * 1/samp_freq) for i in range(num_samples)]
  for key, array in dictionary.items():
      array = array
      ax.plot(time, array, label=key)
  ax.set_ylim(y_lim[0], y_lim[1])
  if zoom is not None:
    ax.set_xlim(zoom[0], zoom[1])
  ax.legend()
  ax.set_xlabel('Time (s)')
  plt.show()

def plot_dict_arrays(dictionary, num_samples, samp_freq, title="", y_lim: tuple = (-1, 100)):
    """
    Mostra i dati in un grafico in relazione al tempo
    """
    fig = go.Figure()

    time = [(i * 1 / samp_freq) for i in range(num_samples)]
    for key, array in dictionary.items():
        fig.add_trace(go.Scatter(x=time, y=array, mode='lines', name=key))

    fig.update_layout(
        title=title,
        xaxis_title='Time (s)',
        yaxis_title='Valore',
        yaxis=dict(range=y_lim),
        showlegend=True,
        autosize=True,
        width=1000,
        height=400
    )

    fig.show()

def plot_histogram(labels: List[str], values, max = 40):

  # Crea l'istogramma
  values = np.array(values)
  plt.bar(labels, values)
  plt.ylim(0, max)
  # Mostra l'istogramma
  plt.show()



In [7]:
from matplotlib.cm import ma
#@title Funzioni analisi

def movimento_totale_gruppo(data, filter: List[str] = None):
  """
  Restituisce i valori di movimento medio di un insieme di persone
  :param data: i dati di movimento
  :param filter: la lista dei nomi delle persone da analizzare, se è None vengono restituiti i dati di tutte le persone
  :return: un dizionario che ha come chiavi gli id e come valori un array di valori medi
  """

  result = {}
  for id, value in data.items():
    if "matching" in value:
        label,_ = value["matching"]
    else:
        label = id

    if filter is None or label in filter:
      for bodypart, values in value["motion_data"].items():
        motion_values = [t[0] for t in values]
        if label not in result:
          result[label] = np.array(motion_values)
        else:
          result[label] = np.array(result[label]) + np.array(motion_values)
      result[label] = result[label] / len(value["motion_data"])

  return result


def movimento_parziale_gruppo(data, bodypart: str, filter: List[int] = None):
  """
  Restituisce i valori di movimento di una parte del corpo per un insieme di persone
  :param data: i dati di movimento
  :param bodypart: la parte del corpo da analizzare: (head, left_arm, right_arm, left_leg, right_leg, left_feet)
  :param filter: la lista dei nomi delle persone da analizzare, se è None vengono restituiti i dati di tutte le persone
  """

  result = {}
  for id, value in data.items():
    if "matching" in value:
        label,_ = value["matching"]
    else:
        label = id

    if filter is None or label in filter:
      result[label] = np.array([t[0] for t in value["motion_data"][bodypart]])

  return result


def movimenti_parziali_singolo(data, label, filter: List[str] = None):
  """
  Restituisce i valori di movimento di una lista di parti del corpo per una persona
  :param data: i dati di movimento
  :param label: il nome della persona da analizzare
  :param filter: la lista delle parti del corpo da analizzare: (head, left_arm, right_arm, left_leg, right_leg, left_feet)
  """
  id = get_person_id(label, data)

  if id not in data:
    print("Persona non trovato")
    return

  result = {}
  for bodypart, values in data[id]["motion_data"].items():
    motion_values = [t[0] for t in values]
    if filter is None or bodypart in filter:
      result[bodypart] = np.array(motion_values)

  return result

def confronto_canzoni(label, data_dict: dict, filter: str = "mean"):
  """
  Restituisce una lista di tuple per una persona indicata, contenenti il valore di movimento medio per ogni canzone
  :param label: persona dalla quale si vogliono avere i dati
  :param data_dict: dizionario che ha come chiavi il nome della canzone e come valore i dati da analizzare
  :param filter: la lista delle parti del corpo da analizzare: (mean, head, left_arm, right_arm, left_leg, right_leg, left_feet)
  """
  output_labels = []
  output_values = []
  for name, data in data_dict.items():
    id = get_person_id(label, data)
    if id not in data:
      print("Persona non trovata")
      return

    if id is not None:
      if filter != "mean" and filter != None:
        value = np.mean([t[0] for t in value["motion_data"][filter]])
      else:
        value = np.mean(movimento_totale_gruppo(data, [label])[label])

      output_labels.append(name)
      output_values.append(value)

  return output_labels, output_values


def get_person_bodyparts_rhythm(data, orientation,  label, filter: List[str] = None):
  """
  Restituisce dei valori che rappresentano la direzione di movimento per una lista di parti del corpo in una persona
  :param data: i dati di movimento
  :param label: il nome della persona da analizzare
  :param filter: la lista delle parti del corpo da analizzare: (head, left_arm, right_arm, left_leg, right_leg, left_feet)
  """
  id = get_person_id(label, data)

  if id not in data:
    print("Persona non trovato")
    return

  result = {}
  for bodypart, values in data[id]["motion_data"].items():
    motion_values = np.array([t[0] for t in values])

    if filter is None or bodypart in filter:
      if orientation == "h":
        direction_values = np.array([t[1] for t in values])
      elif orientation == "v":
        direction_values = np.array([t[2] for t in values])
      else:
        return None

      #direction_values = np.where(motion_values < 0, 0, direction_values) #* motion_values
      #direction_values = np.where(motion_values > 100, 100, direction_values)
      result[bodypart] = np.array(direction_values)
  return result


def get_head_centroids(data, id):
    """
    restituisce una lista contenente dei punti che indicano la posizione media di quella persona nel tempo
    """

    #mean_posizion contiene i punti medio della persona da confrontare
    mean_position = []
    num_bodyparts = len(data[id]["motion_data"])

    for i in range(len(data[id]["motion_data"]["head"])):
      sum_x, sum_y = 0, 0
      for bodypart, values in data[id]["motion_data"].items():

        sum_x += values[i][1]
        sum_y += values[i][2]
      mean_position.append((int(sum_x/num_bodyparts), int(sum_y/num_bodyparts)))
    return mean_position

def calcola_avvicinamenti2(data, label, thresh):
  id_t = get_person_id(label, data)

  if id_t not in data:
    print("Persona non trovato")
    return
  #target_positions = get_centroid_list(data, id_t)
  target_positions = [[x, y] for _, x, y in data[id_t]["motion_data"]["head"]]

  #per ogni id indica la distanza minima dalla persona target
  min_distances = {}
  for id, value in data.items():
    if id_t == id:
      continue

    min_distances[id] = []

    #controlla la parte del corpo che si avvicina di più
    for i in range(len(data[id]["motion_data"]["head"])):
      best_distance = thresh
      x1, y1 = target_positions[i]

      for bodypart, values in value["motion_data"].items():
        x2, y2 = values[i][1], values[i][2]


        distance = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
        if distance < best_distance:
          best_distance = distance


      min_distances[id].append(best_distance)

  return min_distances

def calcola_avvicinamenti(data, label, thresh):
  id_t = get_person_id(label, data)

  if id_t not in data:
    print("Persona non trovato")
    return
  #target_positions = get_centroid_list(data, id_t)
  target_positions = [[x, y] for _, x, y in data[id_t]["motion_data"]["head"]]

  #per ogni id indica la distanza minima dalla persona target
  min_distances = {}
  for id, value in data.items():
    if id_t == id:
      continue

    min_distances[id] = []

    #controlla la parte del corpo che si avvicina di più
    for i in range(len(data[id]["motion_data"]["head"])):
      best_distance = thresh
      x1, y1 = target_positions[i]

      x2, y2 = value["motion_data"]["head"][i][1], value["motion_data"]["head"][i][2]


      distance = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
      if distance < best_distance:
        best_distance = distance


      min_distances[id].append(best_distance)

  return min_distances


In [8]:
def read_folder():
  num_images = len(output_folders)
  #fig, axs = plt.subplots(num_images, 1, figsize=(10, 10))
  #axs = axs.flatten()
  data = []

  for file_path in output_folders:
    #file_path = os.path.join(path, "results.json")
    with open(file_path, 'r') as file:
        data.append( json.load(file) )

    print(f"Persone in {file_path}:")
    for id, value in data[len(data)-1].items():
      if "matching" in value:
        label, _ = value["matching"]

  return data

# Analisi


In [9]:
session_dir =  os.path.join(data_dir, nome_cartella)
output_folders = []
for root, _, files in os.walk(session_dir):
  for file in files:
    path = os.path.join(session_dir, file)
    #if os.path.isdir(path):
    output_folders.append(path)
print(output_folders)
data = read_folder()




['/content/drive/MyDrive/project_sonoramente/data/test3/results.json']
Persone in /content/drive/MyDrive/project_sonoramente/data/test3/results.json:


In [10]:
result = movimento_totale_gruppo(data[0], ["persona8"])
num_samples = get_num_samples(data[0])
plot_dict_arrays(result, num_samples, 10, "Valori di movimento totale")


In [None]:
result = movimento_parziale_gruppo(data[0], "right_arm", ["persona4"])
num_samples = get_num_samples(data[0])
plot_dict_arrays(result, num_samples, 10)

In [None]:
result = movimenti_parziali_singolo(data[0], "persona8", ["right_arm","left_arm"])
num_samples = get_num_samples(data[0])
plot_dict_arrays(result, num_samples, 10)

In [None]:
songs = {
    "canzone1" : data[0],
    "canzone2" : data[1]
}
lab, val = confronto_canzoni("persona9", songs)
plot_histogram(lab, val, max = 10)

IndexError: ignored

In [None]:
result = calcola_avvicinamenti(data[0], "persona9", 1000)
num_samples = get_num_samples(data[0])
plot_dict_arrays(result, num_samples, 6, y_lim=(-1, 1000))