In [6]:
import re
import os
import zipfile
from scipy.io import wavfile
import tempfile
import pandas as pd
import noisereduce as nr                   
from pydub import AudioSegment
from pydub import AudioSegment
import numpy as np
import zipfile

**Final**

In [4]:
class AudioProcessor:

    def __init__(self, input_directory='../Data', output_directory='../Results/Final/noise_kept_all/'):
        """
        Initialize the AudioProcessor class.

        Args:
            input_directory (str): Directory containing the ZIP folders.
            output_directory (str): Output directory for saving denoised audio files.
        """
        self.input_directory = input_directory
        self.output_directory = output_directory
        os.makedirs(self.output_directory, exist_ok=True)

        self.output_audio_directory = output_directory + 'audio/'
        self.output_csv_directory = output_directory + 'csv/'

        os.makedirs(self.output_audio_directory, exist_ok=True)
        os.makedirs(self.output_csv_directory, exist_ok=True)
    
    def only_participant(self, file_path, transcript_path, file_number, return_silence_csv=True):
        audio  = AudioSegment.from_wav(file_path)
        rate, _ = wavfile.read(file_path)
        df = pd.read_csv(transcript_path, delimiter='\t')
        # Filter the participant's intervals
        participant_intervals = df[df["speaker"] == "Participant"]

        # Create an audio segment with the participant's intervals
        participant_audio = AudioSegment.silent(duration=0)  # Initialize an empty segment

        for _, row in participant_intervals.iterrows():
            start_time = int(row["start_time"] * 1000)  # Convert to milliseconds
            stop_time = int(row["stop_time"] * 1000)
            participant_audio += audio[start_time:stop_time]

        if return_silence_csv:
            silence_data = []
            # Iterate through each row in the DataFrame except the last one
            for i in range(len(df) - 1):
                if df['speaker'][i] == 'Ellie' and df['speaker'][i+1] == 'Participant':
                    silence = {
                        'start_time': df['stop_time'][i],
                        'end_time': df['start_time'][i+1],
                        'duration': df['start_time'][i+1] - df['stop_time'][i],
                        'to_respond': 1   # between Ellie and Participant
                    }
                    silence_data.append(silence)
                elif df['speaker'][i] == 'Participant' and df['speaker'][i+1] == 'Participant':
                    silence = {
                        'start_time': df['stop_time'][i],
                        'end_time': df['start_time'][i+1],
                        'duration': df['start_time'][i+1] - df['stop_time'][i],
                        'to_respond': 0   # between Participant and Participant
                    }
                    silence_data.append(silence)

            # Create DataFrame from the list of dictionaries
            df_silence = pd.DataFrame(silence_data)

            # Optionally, sort by start_time
            df_silence = df_silence.sort_values(by='start_time').reset_index(drop=True)

            # Save the DataFrame to a CSV file
            csv_path = os.path.join(self.output_csv_directory, f"{file_number}_silence.csv")       
            df_silence.to_csv(csv_path, index=False)

        return participant_audio, rate

    def remove_noise(self, data, rate):
        data = np.array(data.get_array_of_samples())
        if len(data.shape) > 1:
            data = data[:, 0]
        reduced_noise = nr.reduce_noise(y=data, sr=rate)
        return reduced_noise
    
    def save_audio(self, audio, file_number):
        # Create the path for the audio file
        audio_path = os.path.join(self.output_audio_directory, f"{file_number}.wav")
        audio.export(audio_path, format="wav")
        print(f"Audio saved as {file_number}.wav")
        print('---' * 30)
        return None
    

    def process_audio(self, remove_intro=True, remove_noise=False):
        for zip_file in os.listdir(self.input_directory):
            if zip_file.endswith('.zip'):
                zip_file_path = os.path.join(self.input_directory, zip_file)

                with tempfile.TemporaryDirectory() as temp_dir:
                    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                        zip_ref.extractall(temp_dir)

                    for root, dirs, files in os.walk(temp_dir):
                        for filename in files:
                            # Extract the number from the file name
                               
                            if filename.endswith('.wav'):
                                file_number = re.search(r'\d+', filename).group() if re.search(r'\d+', filename) else None
                                file_path = os.path.join(root, filename)
         
                                if remove_intro:
                                    for root, dirs, files in os.walk(temp_dir):
                                        # Find TRANSCRIPT.csv and set time_interview_starts if found
                                        for filename in files:
                                            if filename.endswith('TRANSCRIPT.csv'):
                                                transcript_path = os.path.join(root, filename)
                                                break  # Stop searching for TRANSCRIPT.csv once found

                                    data, rate = self.only_participant(file_path, transcript_path, file_number, return_silence_csv=True)

                                if remove_noise:
                                    data = self.remove_noise(data, rate)
                                else:
                                    data = np.array(data.get_array_of_samples())
                                    if len(data.shape) > 1:
                                        data = data[:, 0]
                                audio = AudioSegment(data.tobytes(), frame_rate=rate, sample_width=data.dtype.itemsize, channels=1)
                                self.save_audio(audio, file_number)

**Transcript and AUs files extraction**

In [5]:
def extract_files(zip_dir, output_dir_transcript, output_dir_clnf_aus, extensions):
    """
    Extracts files with specified extensions from zip files in a directory to specified output directories.
    
    Parameters:
    zip_dir (str): Directory containing zip files.
    output_dir_transcript (str): Directory to save the extracted TRANSCRIPT.csv files.
    output_dir_clnf_aus (str): Directory to save the extracted CLNF_AUs.txt files.
    extensions (dict): Dictionary with extensions as keys and output directories as values.
    """
    for output_dir in extensions.values():
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

    for item in os.listdir(zip_dir):
        if item.endswith('.zip'):
            zip_path = os.path.join(zip_dir, item)
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                for file_name in zip_ref.namelist():
                    for ext, out_dir in extensions.items():
                        if file_name.endswith(ext):
                            zip_ref.extract(file_name, out_dir)
                            print(f"Extracted {file_name} to {out_dir}")

if __name__ == "__main__":
    zip_directory = "../Data"
    output_directory_transcript = "../Results/Final/Transcripts"
    output_directory_clnf_aus = "../Results/Final/AUs"
    file_extensions = {
        "TRANSCRIPT.csv": output_directory_transcript,
        "CLNF_AUs.txt": output_directory_clnf_aus
    }

    extract_files(zip_directory, output_directory_transcript, output_directory_clnf_aus, file_extensions)


Extracted 300_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 300_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 301_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 301_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 302_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 302_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 303_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 303_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 304_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 304_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 305_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 305_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 306_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 306_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 307_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 307_TRANSCRIPT.csv to ../Results/Final/Transcripts
Extracted 308_CLNF_AUs.txt to ../Results/Final/AUs
Extracted 308_TRANSCRIPT.csv to ../Results/Final/Tran

In [None]:
def cargar_audio(file_path):
        rate, data = wavfile.read(file_path)
        return rate, data

_ ,audio_data = cargar_audio('../Data/300_P/300_AUDIO.wav')

audio_file = '../Data/300_P/300_AUDIO.wav'
sound = AudioSegment.from_wav(audio_file)

# Comparar si sound y audio_data son iguales
print(sound == audio_data)

False


In [None]:
# Read the transcript file
df = pd.read_csv('../Data/300_P/300_TRANSCRIPT.csv',delimiter='\t')

# Filter the participant's intervals
participant_intervals = df[df["speaker"] == "Participant"]

# Load the audio file
audio_file = '../Data/300_P/300_AUDIO.wav'
sound = AudioSegment.from_wav(audio_file)

# Create an audio segment with the participant's intervals
participant_audio = AudioSegment.silent(duration=0)  # Initialize an empty segment

for _, row in participant_intervals.iterrows():
    start_time = int(row["start_time"] * 1000)  # Convert to milliseconds
    stop_time = int(row["stop_time"] * 1000)
    participant_audio += sound[start_time:stop_time]

# Save the participant's audio as a WAV file
participant_audio.export("participant_audio.wav", format="wav")

<_io.BufferedRandom name='participant_audio.wav'>

In [None]:
# Read the CSV file
df = pd.read_csv('../Data/300_P/300_TRANSCRIPT.csv', delimiter="\t")

In [None]:
# Initialize an empty list to store dictionaries representing periods of silence
silence_data = []

# Iterate through each row in the DataFrame except the last one
for i in range(len(df) - 1):
    if df['speaker'][i] == 'Ellie' and df['speaker'][i+1] == 'Participant':
        silence = {
            'start_time': df['stop_time'][i],
            'end_time': df['start_time'][i+1],
            'duration': df['start_time'][i+1] - df['stop_time'][i],
            'to_respond': 1   # between Ellie and Participant
        }
        silence_data.append(silence)
    elif df['speaker'][i] == 'Participant' and df['speaker'][i+1] == 'Participant':
        silence = {
            'start_time': df['stop_time'][i],
            'end_time': df['start_time'][i+1],
            'duration': df['start_time'][i+1] - df['stop_time'][i],
            'to_respond': 0   # between Participant and Participant
        }
        silence_data.append(silence)

# Create DataFrame from the list of dictionaries
df_silence = pd.DataFrame(silence_data)


# Optionally, sort by start_time
df_silence = df_silence.sort_values(by='start_time').reset_index(drop=True)

# df_silence now contains the periods of silence as a DataFrame
print(df_silence)

    start_time  end_time  duration  to_respond
0       61.378    62.328      0.95           1
1       67.528    68.978      1.45           1
2       74.198    75.028      0.83           1
3       83.578    83.808      0.23           1
4       87.898    88.458      0.56           1
..         ...       ...       ...         ...
82     590.918   598.238      7.32           1
83     599.708   600.608      0.90           0
84     602.688   602.738      0.05           1
85     616.878   618.308      1.43           1
86     620.348   620.538      0.19           1

[87 rows x 4 columns]
