In [2]:
import moviepy.editor as mp
from moviepy.video.io.VideoFileClip import VideoFileClip
import speech_recognition as sr
import string
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from nltk.tokenize import word_tokenize
import json
import os
import math
from google.cloud import speech
import pandas as pd
from openpyxl import load_workbook

In [26]:
# Variables para utilizar siempre el mismo nombre de archivo

name_file = "Caco"
n_file = 8

In [None]:
# Extraigo el audio del vídeo

video = mp.VideoFileClip(".\\videos\\{}_{}.mp4".format(name_file, n_file))
audio = video.audio.write_audiofile(r".\\audios\\{}_{}.wav".format(name_file, n_file)) # de cuando se recibia un vídeo

In [42]:
# RECONOCEDOR DE GOOGLE CLOUD - SPEECH TO TEXT API

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'key.json'
speech_client = speech.SpeechClient()

audio_filename = ".\\audios\\{}_{}.wav".format(name_file, n_file)
with open(audio_filename, 'rb') as f:
    byte_data_wav = f.read()

audio_wav = speech.RecognitionAudio(content=byte_data_wav)
config_wav = speech.RecognitionConfig(
    sample_rate_hertz = 44100, # el sample ratio de hercios para los archivos WAV
    language_code = 'es-ES', # código de idioma para español de España
    enable_word_time_offsets = True, # tag para obtener los timestamps de las palabras
    audio_channel_count = 2
)

# Aquí se usa el reconocedor para devolver un JSON con la transcripción generada
# Esto es para cuando el archivo de audio dura menos de 1 minuto y pesa menos de 10Mb
response_wav = speech_client.recognize(
    config = config_wav,
    audio = audio_wav
)

# Esto sirve para cuando el archivo supera el minuto y las 10Mb de tamaño
'''media_uri = 'gs://tfg-cdr/Javi_5.wav'
long_audio_wav = speech.RecognitionAudio(uri=media_uri)
long_config_wav = speech.RecognitionConfig(
    sample_rate_hertz = 44100,
    language_code = 'es-ES',
    enable_word_time_offsets = True,
    audio_channel_count = 2
)

long_response_wav = speech_client.long_running_recognize(
    config = long_config_wav,
    audio = long_audio_wav
)

response_wav = long_response_wav.result(timeout=90)'''

# Se abre un JSON nuevo y se carga en la variable "data"
json_filename = ".\\t_raw\\{}_{}.json".format(name_file, n_file)
json_filename_rev = ".\\t_processed\\{}_{}.json".format(name_file, n_file)
with open(json_filename, "w", encoding='utf-8') as f:
    json.dump([], f)
with open(json_filename_rev, "w", encoding='utf-8') as f:
    json.dump([], f)
with open(json_filename, "r", encoding='utf-8') as f:
    data = json.load(f)

# Cargo la lista de stopwords para el español
sw = set(stopwords.words("spanish"))

# Navego por el JSON resultado de la transcripción
for result in response_wav.results:
    alternative = result.alternatives[0]
    for word_info in alternative.words:
        word = word_info.word
        word = word.lower()
        start_time = word_info.start_time.total_seconds()
        # Elimino ya las palabras que sean stopwords excepto la palabra "no"
        if word not in sw or word == 'no':
            # Genero un pequeño diccionario a modo de objeto para guardar la información de la palabra y su start time
            entry = {
                'word': word,
                'start': start_time
            }
            # Lo meto en la lista "data"
            data.append(entry)

# Guardo el contenido de "data" en el archivo JSON para revisarlo
with open(json_filename, "w", encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False)
with open(json_filename_rev, "w", encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False)

print("Se han generado correctamente los siguientes archivos:\n{}\n{}".format(json_filename, json_filename_rev))

In [27]:
# Estemización de las palabras transcritas desde la API de GOOGLE CLOUD SPEECH TO TEXT

json_filename = ".\\t_processed\\{}_{}.json".format(name_file, n_file)
with open(json_filename, 'r', encoding='utf-8') as f:
    data = json.load(f)

json_filename_rev = ".\\t_processed\\{}_{}_rev.json".format(name_file, n_file)
with open(json_filename_rev, "w", encoding='utf-8') as f:
    data_rev = json.dump([], f)
with open(json_filename_rev, "r", encoding='utf-8') as f:
    data_rev = json.load(f)

for t in data:
    word = t['word']
    start = t['start']
    entry = {
        'word': word,
        'stem': SnowballStemmer('spanish').stem(word),
        'start': start
    }
    data_rev.append(entry)

with open(json_filename_rev, "w", encoding='utf-8') as f:
    json.dump(data_rev, f, ensure_ascii=False)

print("Se ha procesado por medio de stemming el archivo {}_{}.json y se ha generado el siguiente archivo:\n{}".format(name_file, n_file, json_filename_rev))

In [28]:
# Se añaden los tags y se genera el JSON de PVA completo

instrucciones = {
    'aceler': 'T-ON',
    'acerc rotond': 'APROX',
    'rotond medi': 'RND-MD',
    'manteng aceler': 'T-HOLD',
    'rotond cerc': 'RND-NR',
    'izquierd libr': 'L-FREE',
    'izquierd ocup': 'L-BUSY',
    'fren ced': 'B-ON',
    'fren stop': 'B-ON',
    'fren': 'B-ON',
    'stop': 'B-ON',
    'ced': 'RND-IN',
    'entro rotond': 'RND-IN',
    'gir derech': 'TURN-R',
    'intermitent izquierd': 'LB-ON',
    'pong intermitent izquierd': 'LB-ON',
    'intermitent izquierd rotond': 'LB-ON',
    'rotond gir izquierd': 'TURN-L',
    'rotond gir derech': 'TURN-R',
    'gir izquierd': 'TURN-L',
    'quit intermitent': 'BLK-OFF',
    'intermitent derech': 'RB-ON',
    'pong intermitent derech': 'RB-ON',
    'intermitent derech rotond': 'RB-ON',
    'salg rotond': 'RND-EXIT',
    'aproxim rotond': 'APROX',
    'levant aceler': 'T-OFF',
    'levant pie': 'T-OFF',
    'levant pie aceler': 'T-OFF',
    'no vien nadi': 'RND-CLEAR',
    'gir izquierd rotond': 'TURN-L',
    'gir derech rotond': 'TURN-R',
    'atencion ciclist': 'CAR-NR',
    'atencion coch': 'CAR-NR',
    'centr libr': 'F-FREE',
    'centr ocup': 'F-BUSY',
    'enderez': 'TURN-STR',
    'sig rect': 'STR',
    'rect': 'STR',
    'primer sal': 'EXIT-1',
    'segund sal': 'EXIT-2',
    'tercer sal': 'EXIT-3',
    'cuart sal': 'EXIT-4'
}

# Utilizar este json_filename si se ha transcrito con el reconocedor básico
#json_filename = ".\\t_processed\\{}_{}.json".format(name_file, n_file)

# Utilizar este json_filename si se ha transcrito con el reconocedor de la API de GOOGLE CLOUD SPEECH TO TEXT
json_filename = ".\\t_processed\\{}_{}_rev.json".format(name_file, n_file)

with open(json_filename, "r", encoding='utf-8') as f:
    data = json.load(f)

dataset = {}
count = 0

for i, x in enumerate(data):
    w = x['stem']
    if (len(data) > i + 2) and x['stem'] + ' ' + data[i + 1]['stem'] + ' ' + data[i + 2]['stem'] in instrucciones:
        sentence = x['word'] + ' ' + data[i + 1]['word'] + ' ' + data[i + 2]['word']
        sentence_t = x['stem'] + ' ' + data[i + 1]['stem'] + ' ' + data[i + 2]['stem']
        dataset["inst_{}".format(count)] = [x, data[i + 1], data[i + 2]]
        dataset["inst_{}".format(count)].append({"tag": instrucciones[sentence_t], "sentence": sentence})
        count += 1
        del data[i + 1]
        del data[i + 1]
    elif (len(data) > i + 1) and x['stem'] + ' ' + data[i + 1]['stem'] in instrucciones:
        sentence = x['word'] + ' ' + data[i + 1]['word']
        sentence_t = x['stem'] + ' ' + data[i + 1]['stem']
        dataset["inst_{}".format(count)] = [x, data[i + 1]]
        dataset["inst_{}".format(count)].append({"tag": instrucciones[sentence_t], "sentence": sentence})
        count += 1
        del data[i + 1]
    elif w in instrucciones:
        dataset["inst_{}".format(count)] = [x]
        dataset["inst_{}".format(count)].append({"tag": instrucciones[w], "sentence": x['word']})
        count += 1

json_out = ".\\t_tags\\{}_{}.json".format(name_file, n_file)
with open(json_out, "w", encoding='utf-8') as f:
    json.dump(dataset, f, ensure_ascii=False)

print("Se ha etiquetado correctamente el archivo y se ha generado el siguiente fichero:\n{}".format(json_out))

In [29]:
# Fusión del JSON de pensamiento en voz alta con el JSON de Carla correspondiente a cada grabación

pva_json_filename = ".\\t_tags\\{}_{}.json".format(name_file, n_file)
with open(pva_json_filename, 'r', encoding='utf-8') as f:
    pva_data = json.load(f)

carla_json_filename = ".\\dataset\\parse\\{}_{}\\SIM_DATA.json".format(name_file, n_file)
with open(carla_json_filename, 'r', encoding='utf-8') as f:
    carla_data = json.load(f)

for inst in pva_data:
    start = pva_data[inst][0]['start']
    last = pva_data[inst][-1]
    for i, tick in enumerate(carla_data):
        if (i + 1) < len(carla_data):
            tick_next = carla_data[i + 1]
            if tick['timestamp'] <= start and tick_next['timestamp'] > start:
                carla_dic = dict(tick)
                last['carla'] = carla_dic
                light_state = last['carla']['control']['light_state']
                light_state = "Left_Blinker" if light_state == 32 else "Right_Blinker" if light_state == 16 else "NONE"
                last['carla']['control']['light_state'] = light_state
                del last['carla']['timestamp']
                break
            
final_json_filename = ".\\t_merged\\{}_{}.json".format(name_file, n_file)
with open(final_json_filename, 'w', encoding='utf-8') as f:
    json.dump(pva_data, f, ensure_ascii=False)

print("Se han fusionado correctamente los datos de verbalización con lo datos de Carla en el siguiente archivo:\n{}".format(final_json_filename))

In [7]:
# Mete la información de todos los JSON para la sección de APROXIMACIÓN A ROTONDA en un libro de Excel. Cada JSON está en una hoja distinta

dir_path = '.\\t_merged'
file_list = os.listdir(dir_path)

for file in file_list:
    json_path = ".\\t_merged\\{}".format(file)
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    ceda_md = 0
    ceda_nr = 0
    ceda_in = 0

    zi = 0
    zc = 0

    salida = 3

    for d in data:
        inst = data[d][-1]
        if inst['sentence'] == 'primera salida':
            salida = 1
        elif inst['sentence'] == 'segunda salida':
            salida = 2
        elif inst['sentence'] == 'tercera salida':
            salida = 3
        elif inst['sentence'] == 'cuarta salida':
            salida = 4

    index_list = []
    data_list = []

    for d in data:
        inst = data[d][-1]
        carla = inst['carla']
        control = inst['carla']['control']
        loc = inst['carla']['location']
        dist = round(math.sqrt((-6.22 - loc['x'])**2 + (-28.37 - loc['y'])**2), 2)

        if inst['sentence'] == 'primera salida':
            continue
        elif inst['sentence'] == 'segunda salida':
            continue
        elif inst['sentence'] == 'tercera salida':
            continue
        elif inst['sentence'] == 'cuarta salida':
            continue

        if inst['sentence'] == 'izquierda ocupada':
            zi = 1
        elif inst['sentence'] == 'centro ocupado':
            zc = 1
        elif inst['sentence'] == 'izquierda libre' or inst['sentence'] == 'centro libre':
            zi = 0
            zc = 0

        if inst['sentence'] == 'rotonda media':
            ceda_md = 1
            continue
        elif inst['sentence'] == 'rotonda cerca':
            ceda_nr = 1
            ceda_md = 0
            continue
        elif inst['sentence'] == 'izquierda libre' or inst['sentence'] == 'izquierda ocupada':
            ceda_in = 1
            ceda_nr = 0
        elif inst['sentence'] == 'entro rotonda':
            ceda_in = 0
            entry = {'instruction': inst['sentence'], 'tag': inst['tag'], 'salida': salida, 'ceda_md': ceda_md, 'ceda_nr': ceda_nr, 'ceda_in': ceda_in, 'zi_ocupada': zi, 'zc_ocupada': zc, 'metros_ceda': dist, 'throttle': control['throttle'], 'steer': control['steer'], 'brake': control['brake'], 'light_state': control['light_state'], 'hand_brake': control['hand_brake'], 'reverse': control['reverse'], 'speed': carla['speed (km/h)'], 'loc_x': loc['x'], 'loc_y': loc['y'], 'loc_z': loc['z']}
            index_list.append(d)
            data_list.append(entry)
            break
        
        entry = {'instruction': inst['sentence'], 'tag': inst['tag'], 'salida': salida, 'ceda_md': ceda_md, 'ceda_nr': ceda_nr, 'ceda_in': ceda_in, 'zi_ocupada': zi, 'zc_ocupada': zc, 'metros_ceda': dist, 'throttle': control['throttle'], 'steer': control['steer'], 'brake': control['brake'], 'light_state': control['light_state'], 'hand_brake': control['hand_brake'], 'reverse': control['reverse'], 'speed': carla['speed (km/h)'], 'loc_x': loc['x'], 'loc_y': loc['y'], 'loc_z': loc['z']}
        index_list.append(d)
        data_list.append(entry)

    df = pd.DataFrame(data=data_list, index=index_list)
    excel_path_train = ".\\dataset_excel\\aprox_rotonda_train.xlsx"
    excel_path_test = ".\\dataset_excel\\aprox_rotonda_test.xlsx"
    sheet_name = 'Aprox_Rotonda'
    sheet_name_test = re.sub('.json', '', file)
    try:
        with pd.ExcelWriter(excel_path_train, engine='openpyxl', mode='a') as writer:
            writer.book = load_workbook(excel_path_train)
            writer.sheets = {ws.title: ws for ws in writer.book.worksheets}
            startrow = writer.sheets[sheet_name].max_row
            df.to_excel(writer, sheet_name, header=False, startrow=startrow, index=False)
    except FileNotFoundError:
        with pd.ExcelWriter(excel_path_train, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name, index=False)

    try:
        with pd.ExcelWriter(excel_path_test, engine='openpyxl', mode='a') as writer:
            df.to_excel(writer, sheet_name_test, index=False)
    except FileNotFoundError:
        with pd.ExcelWriter(excel_path_test, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name_test, index=False)

print("Se han generado correctamente los archivos aprox_rotonda_train.xlsx y aprox_rotonda_test.xlsx")

In [34]:
# Mete la información de todos los JSON para la sección de DENTRO DE LA ROTONDA en un libro de Excel. Cada JSON está en una hoja distinta

dir_path = '.\\t_merged'
file_list = os.listdir(dir_path)

for file in file_list:
    json_path = ".\\t_merged\\{}".format(file)
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    index_list = []
    data_list = []
    get_data = False
    car_nr = 0
    salida = 3

    for d in data:
        inst = data[d][-1]
        if inst['sentence'] == 'primera salida':
            salida = 1
        elif inst['sentence'] == 'segunda salida':
            salida = 2
        elif inst['sentence'] == 'tercera salida':
            salida = 3
        elif inst['sentence'] == 'cuarta salida':
            salida = 4

    for d in data:
        inst = data[d][-1]
        carla = inst['carla']
        control = inst['carla']['control']
        loc = inst['carla']['location']
        if get_data:
            if inst['sentence'] == 'primera salida':
                continue
            elif inst['sentence'] == 'segunda salida':
                continue
            elif inst['sentence'] == 'tercera salida':
                continue
            elif inst['sentence'] == 'cuarta salida':
                continue

            if inst['tag'] == 'CAR-NR':
                car_nr = 1
            
            entry = {'instruction': inst['sentence'], 'tag': inst['tag'], 'salida': salida, 'coche_cerca': car_nr, 'throttle': control['throttle'], 'steer': control['steer'], 'brake': control['brake'], 'light_state': control['light_state'], 'hand_brake': control['hand_brake'], 'reverse': control['reverse'], 'speed': carla['speed (km/h)'], 'loc_x': loc['x'], 'loc_y': loc['y'], 'loc_z': loc['z']}
            index_list.append(d)
            data_list.append(entry)
            
        if inst['sentence'] == "entro rotonda":
            get_data = True
        elif inst['sentence'] == 'salgo rotonda':
            get_data = False

        if inst['sentence'] == 'freno':
            car_nr = 0
        
    df = pd.DataFrame(data=data_list, index=index_list)
    excel_path_train = ".\\dataset_excel\\dentro_rotonda_train.xlsx"
    excel_path_test = ".\\dataset_excel\\dentro_rotonda_test.xlsx"
    sheet_name = 'Dentro_Rotonda'
    sheet_name_test = re.sub('.json', '', file)
    try:
        with pd.ExcelWriter(excel_path_train, engine='openpyxl', mode='a') as writer:
            writer.book = load_workbook(excel_path_train)
            writer.sheets = {ws.title: ws for ws in writer.book.worksheets}
            startrow = writer.sheets[sheet_name].max_row
            df.to_excel(writer, sheet_name, header=False, startrow=startrow, index=False)
    except FileNotFoundError:
        with pd.ExcelWriter(excel_path_train, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name, index=False)

    try:
        with pd.ExcelWriter(excel_path_test, engine='openpyxl', mode='a') as writer:
            df.to_excel(writer, sheet_name_test, index=False)
    except FileNotFoundError:
        with pd.ExcelWriter(excel_path_test, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name_test, index=False)

print("Se han generado correctamente los archivos dentro_rotonda_train.xlsx y dentro_rotonda_test.xlsx")            

In [4]:
# Mete la información de todos los JSON para la sección de SALIDA DE LA ROTONDA en un libro de Excel. Cada JSON está en una hoja distinta

dir_path = '.\\t_merged'
file_list = os.listdir(dir_path)

for file in file_list:
    json_path = ".\\t_merged\\{}".format(file)
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    index_list = []
    data_list = []
    get_data = False
    car_nr = 0
    salida = 3

    for d in data:
        inst = data[d][-1]
        if inst['sentence'] == 'primera salida':
            salida = 1
        elif inst['sentence'] == 'segunda salida':
            salida = 2
        elif inst['sentence'] == 'tercera salida':
            salida = 3
        elif inst['sentence'] == 'cuarta salida':
            salida = 4

    for d in data:
        inst = data[d][-1]
        carla = inst['carla']
        control = inst['carla']['control']
        loc = inst['carla']['location']
        if get_data:
            if inst['sentence'] == 'primera salida':
                continue
            elif inst['sentence'] == 'segunda salida':
                continue
            elif inst['sentence'] == 'tercera salida':
                continue
            elif inst['sentence'] == 'cuarta salida':
                continue

            if inst['tag'] == 'CAR-NR':
                car_nr = 1

            entry = {'instruction': inst['sentence'], 'tag': inst['tag'], 'salida': salida, 'coche_cerca': car_nr, 'throttle': control['throttle'], 'steer': control['steer'], 'brake': control['brake'], 'light_state': control['light_state'], 'hand_brake': control['hand_brake'], 'reverse': control['reverse'], 'speed': carla['speed (km/h)'], 'loc_x': loc['x'], 'loc_y': loc['y'], 'loc_z': loc['z']}
            index_list.append(d)
            data_list.append(entry)

        if inst['sentence'] == 'freno':
            car_nr = 0
        if inst['sentence'] == "salgo rotonda":
            get_data = True
        
    df = pd.DataFrame(data=data_list, index=index_list)
    excel_path_train = ".\\dataset_excel\\salida_rotonda_train.xlsx"
    excel_path_test = ".\\dataset_excel\\salida_rotonda_test.xlsx"
    sheet_name = 'Salida_Rotonda'
    sheet_name_test = re.sub('.json', '', file)
    try:
        with pd.ExcelWriter(excel_path_train, engine='openpyxl', mode='a') as writer:
            writer.book = load_workbook(excel_path_train)
            writer.sheets = {ws.title: ws for ws in writer.book.worksheets}
            startrow = writer.sheets[sheet_name].max_row
            df.to_excel(writer, sheet_name, header=False, startrow=startrow, index=False)
    except FileNotFoundError:
        with pd.ExcelWriter(excel_path_train, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name, index=False)

    try:
        with pd.ExcelWriter(excel_path_test, engine='openpyxl', mode='a') as writer:
            df.to_excel(writer, sheet_name_test, index=False)
    except FileNotFoundError:
        with pd.ExcelWriter(excel_path_test, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name_test, index=False)

print("Se han generado correctamente los archivos salida_rotonda_train.xlsx y salida_rotonda_test.xlsx")