#**Automatic Audio Recognition**
This script includes 3 different models which analyze different parts of audio in super bowl ads.
1. *Gender specific speaking time* (and durations of speaking parts)
2. *Emotion recognition from Transcription* (uses only transcription from WhisperAI for analysis)
3. *Acoustic Indizes* (many different indicators like min/max_energy, db and tempo)



# *1. Gender specific speaking time*

Initially, the audio file will be segmented before gender recognition can take place

Audio segmentation: https://github.com/pyannote/pyannote-audio

Gender recognition: https://github.com/x4nth055/gender-recognition-by-voice

### Installation

In [256]:
from pydub import AudioSegment
from tqdm import tqdm
from pyannote.audio import Pipeline
import torch
import pyaudio
import os
import wave
import librosa
import numpy as np
import pandas as pd
import tqdm
import locale
import tensorflow as tf
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"  # Force CPU usage


# sed -i 's/np\.NaN/np\.nan/g' ~/Development/Commercial-Brand-Differentiating-Message-Analysis/venv/lib/python3.12/site-packages/pyannote/audio/core/inference.py
def getpreferredencoding(do_setlocale = True):
    return "UTF-8"
locale.getpreferredencoding = getpreferredencoding

In [257]:
# #Optional
# from google.colab import drive
# drive.mount('/content/drive')

### Mandatory Login

To load the speaker diarization pipeline,

* accept the user conditions on [hf.co/pyannote/speaker-diarization-3.1](https://hf.co/pyannote/speaker-diarization-3.1)
* accept the user conditions on [hf.co/pyannote/segmentation-3.0](https://hf.co/pyannote/segmentation-3.0)
* login using `notebook_login` below

In [258]:
# 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

In [259]:
import os
from dotenv import load_dotenv
from huggingface_hub import login

# Load environment variables from the .env file
load_dotenv()

# Retrieve the Hugging Face API token from the environment
hf_token = os.getenv("HF_API_KEY")
# Log in to Hugging Face
login(token=hf_token)

## Audio Splitter Method Definition



### Utils


In [260]:
from sys import byteorder
from array import array
from struct import pack
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout
from sklearn.model_selection import train_test_split


label2int = {
    "male": 1,
    "female": 0
}


def load_data(vector_length=128):
    """A function to load gender recognition dataset from `data` folder
    After the second run, this will load from results/features.npy and results/labels.npy files
    as it is much faster!"""
    # make sure results folder exists
    if not os.path.isdir("results"):
        os.mkdir("results")
    # if features & labels already loaded individually and bundled, load them from there instead
    if os.path.isfile(os.path.join(os.getenv("TONANALYSE_AUDIO_GENDER_NOTEBOOKS_DIR"), "results/features.npy")) and os.path.isfile("results/labels.npy"):
        X = np.load(os.path.join(os.getenv("TONANALYSE_AUDIO_GENDER_NOTEBOOKS_DIR"), "results/features.npy"))
        y = np.load(os.path.join(os.getenv("TONANALYSE_AUDIO_GENDER_NOTEBOOKS_DIR"), "results/labels.npy"))
        return X, y
    # read dataframe
    df = pd.read_csv(os.path.join(os.getenv("TONANALYSE_AUDIO_GENDER_NOTEBOOKS_DIR"), "balanced-all.csv"))
    # get total samples
    n_samples = len(df)
    # get total male samples
    n_male_samples = len(df[df['gender'] == 'male'])
    # get total female samples
    n_female_samples = len(df[df['gender'] == 'female'])
    print("Total samples:", n_samples)
    print("Total male samples:", n_male_samples)
    print("Total female samples:", n_female_samples)
    # initialize an empty array for all audio features
    X = np.zeros((n_samples, vector_length))
    # initialize an empty array for all audio labels (1 for male and 0 for female)
    y = np.zeros((n_samples, 1))
    for i, (filename, gender) in tqdm.tqdm(enumerate(zip(df['filename'], df['gender'])), "Loading data", total=n_samples):
        features = np.load(filename)
        X[i] = features
        y[i] = label2int[gender]
    # save the audio features and labels into files
    # so we won't load each one of them next run
    np.save(os.path.join(os.getenv("TONANALYSE_AUDIO_GENDER_NOTEBOOKS_DIR"), "results/features"), X)
    np.save(os.path.join(os.getenv("TONANALYSE_AUDIO_GENDER_NOTEBOOKS_DIR"), "results/"), y)
    return X, y


def split_data(X, y, test_size=0.1, valid_size=0.1):
    # split training set and testing set
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=7)
    # split training set and validation set
    X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=valid_size, random_state=7)
    # return a dictionary of values
    return {
        "X_train": X_train,
        "X_valid": X_valid,
        "X_test": X_test,
        "y_train": y_train,
        "y_valid": y_valid,
        "y_test": y_test
    }

import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  try:
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
  except RuntimeError as e:
    print(e)
def create_model(vector_length=128):
    print("creating model ...")
    """5 hidden dense layers from 256 units to 64, not the best model, but not bad."""
    model = Sequential()
    model.add(Dense(256, input_shape=(vector_length,)))
    model.add(Dropout(0.3))
    model.add(Dense(256, activation="relu"))
    model.add(Dropout(0.3))
    model.add(Dense(128, activation="relu"))
    model.add(Dropout(0.3))
    model.add(Dense(128, activation="relu"))
    model.add(Dropout(0.3))
    model.add(Dense(64, activation="relu"))
    model.add(Dropout(0.3))
    # one output neuron with sigmoid activation function, 0 means female, 1 means male
    model.add(Dense(1, activation="sigmoid"))
    # using binary crossentropy as it's male/female classification (binary)
    model.compile(loss="binary_crossentropy", metrics=["accuracy"], optimizer="adam")
    # print summary of the model
    model.summary()
    return model




### Test Definition

In [261]:
THRESHOLD = 500
CHUNK_SIZE = 1024
FORMAT = pyaudio.paInt16
RATE = 16000

SILENCE = 30

def is_silent(snd_data):
    "Returns 'True' if below the 'silent' threshold"
    return max(snd_data) < THRESHOLD

def normalize(snd_data):
    "Average the volume out"
    MAXIMUM = 16384
    times = float(MAXIMUM)/max(abs(i) for i in snd_data)

    r = array('h')
    for i in snd_data:
        r.append(int(i*times))
    return r

def trim(snd_data):
    "Trim the blank spots at the start and end"
    def _trim(snd_data):
        snd_started = False
        r = array('h')

        for i in snd_data:
            if not snd_started and abs(i)>THRESHOLD:
                snd_started = True
                r.append(i)

            elif snd_started:
                r.append(i)
        return r

    # Trim to the left
    snd_data = _trim(snd_data)

    # Trim to the right
    snd_data.reverse()
    snd_data = _trim(snd_data)
    snd_data.reverse()
    return snd_data

def add_silence(snd_data, seconds):
    "Add silence to the start and end of 'snd_data' of length 'seconds' (float)"
    r = array('h', [0 for i in range(int(seconds*RATE))])
    r.extend(snd_data)
    r.extend([0 for i in range(int(seconds*RATE))])
    return r


def extract_feature(file_name, **kwargs):
    """
    Extract feature from audio file `file_name`
        Features supported:
            - MFCC (mfcc)
            - Chroma (chroma)
            - MEL Spectrogram Frequency (mel)
            - Contrast (contrast)
            - Tonnetz (tonnetz)
        e.g:
        `features = extract_feature(path, mel=True, mfcc=True)`
    """
    mfcc = kwargs.get("mfcc")
    chroma = kwargs.get("chroma")
    mel = kwargs.get("mel")
    contrast = kwargs.get("contrast")
    tonnetz = kwargs.get("tonnetz")
    X, sample_rate = librosa.core.load(file_name)
    if chroma or contrast:
        stft = np.abs(librosa.stft(X))
    result = np.array([])
    if mfcc:
        mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T, axis=0)
        result = np.hstack((result, mfccs))
    if chroma:
        chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T,axis=0)
        result = np.hstack((result, chroma))
    if mel:
        mel = np.mean(librosa.feature.melspectrogram(y=X, sr=sample_rate).T,axis=0)
        result = np.hstack((result, mel))
    if contrast:
        contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=sample_rate).T,axis=0)
        result = np.hstack((result, contrast))
    if tonnetz:
        tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(X), sr=sample_rate).T,axis=0)
        result = np.hstack((result, tonnetz))
    return result

In [262]:
from openpyxl import load_workbook


def audio_splitter(input_folder, output_base_folder, output_base_excel_folder):
    print("Starting audio splitting...")
    # Construct the model
    model = create_model()
    model.load_weights(f"{os.getenv('TONANALYSE_AUDIO_GENDER_NOTEBOOKS_DIR')}/results/model.h5")
    # Iterate through each folder from ADs_IG_2013_wav to ADs_IG_2022_wav
    years = os.getenv("YEARS").split(" ")

    for folder_name in years:
     
    # for folder_name in range(2013, 2023):
        input_path = f"{input_folder}/ADs_IG_{folder_name}"
        output_folder = f"{output_base_folder}/ADs_IG_{folder_name}"
        excel_output_folder = f"{output_base_excel_folder}/ADs_IG_{folder_name}"
        
        os.makedirs(output_folder, exist_ok=True)
        files = [f for f in os.listdir(input_path) if f.endswith('.wav')]
        print("files", files)
        files.sort()
        print("files", files)

        # Iterate through each file in the current folder
        for file_name in files:
                audio_path = f"{input_path}/{file_name}"  # Path to the audio file
                print(f"P {audio_path}")
                audio = AudioSegment.from_wav(audio_path)

                result_per_audio = []

                # Initialize an empty list to store the segmented audio
                voice_timestamp = []
                segment_index = 0

                # Load audio for diarization
                own_file = {'audio': audio_path}  # Provide the audio file path
                pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization-3.1', use_auth_token=True)
                if torch.cuda.is_available():
                    pipeline.to(torch.device('cuda'))


                from pyannote.audio.pipelines.utils.hook import ProgressHook
                with ProgressHook() as hook:
                    diarization = pipeline(own_file, hook=hook)


                # Segmentation logic
                for turn, _, speaker in diarization.itertracks(yield_label=True):
                    segment_id = f"segment_{segment_index}"
                    voice_timestamp.append((segment_id, turn.start, turn.end, speaker))
                    segment_index += 1


                # Loop through the voice_timestamp list and split the audio
                for idx, (_, start, stop, speaker) in enumerate(voice_timestamp):
                    segment = audio[start * 1000: stop * 1000]  # Extract the segment in milliseconds
                    output_subfolder = f"{output_folder}/{os.path.splitext(file_name)[0]}"  # Create subfolder based on file name
                    os.makedirs(output_subfolder, exist_ok=True)  # Create subfolder if it doesn't exist

                    if((stop - start) < 0.05):
                      print(f"segment_{idx}.wav is too short!")
                      continue
                    else:
                      segment.export(f"{output_subfolder}/segment_{idx}.wav", format="wav")

                    # Extract features and reshape it (assuming you have an extract_feature function)
                    features = extract_feature(f"{output_subfolder}/segment_{idx}.wav", format="wav", mel=True).reshape(1, -1)

                    # Predict the gender
                    male_prob = model.predict(features)[0][0]
                    female_prob = 1 - male_prob
                    gender = "male" if male_prob > female_prob else "female"


                    # Store segment information in the list
                    duration = stop - start
                    segID = f"segment_{idx}.wav"
                    result_per_audio.append((file_name, segID, start,stop, duration, speaker, gender, male_prob, female_prob))


                for result_tuple in result_per_audio:
                  file_name, segID, turn_start, turn_end, duration, speaker, gender, male_prob, female_prob = result_tuple
                  print(f"file_name: {file_name}, segment_id: {segID}, turn.start: {turn_start}, turn.end: {turn_end}, duration: {duration}, speaker: {speaker}, gender: {gender}, male_prob: {male_prob}, female_prob: {female_prob}")



                # Create DataFrame from result_per_audio
                df = pd.DataFrame(result_per_audio, columns=['file_name', 'segment_id', 'start', 'end', 'duration', 'speaker', 'gender', 'male_prob', 'female_prob'])

                # Create directory for saving Excel file
                excel_output_folder = f"{output_base_excel_folder}/ADs_IG_{folder_name}" #.wav
                os.makedirs(excel_output_folder, exist_ok=True)  # Create output directory if it doesn't exist

                # Save DataFrame to Excel
                excel_file_path = f"{excel_output_folder}/{os.path.splitext(file_name)[0]}.xlsx"  # Excel file path
                print(f"Excel file path: {excel_file_path}")
                # Check if Excel file already exists


                try:
                    # Try to write to existing file
                    with pd.ExcelWriter(excel_file_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer:
                        df.to_excel(writer, sheet_name='Gender_speaking_time', index=False)
                        print(f"Writing to {excel_file_path} completed!") 
                except:
                    # If file is corrupted or doesn't exist, create new file
                    df.to_excel(excel_file_path, sheet_name='Gender_speaking_time', index=False, engine='openpyxl')
                    print(f"Writing to {excel_file_path} completed!")



### Main

---




In [263]:

input_folder = os.getenv("ADS_DIR")
print("input_folder", input_folder)
output_base_folder = os.getenv("OUTPUT_BILD_PLUS_TON_LISTS_DIR")
output_base_excel_folder = os.getenv("OUTPUT_BILD_PLUS_TON_LISTS_DIR")
splitted_audios= audio_splitter(input_folder, output_base_folder, output_base_excel_folder)

input_folder /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs
Starting audio splitting...
creating model ...


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


files ['AD0253.wav']
files ['AD0253.wav']
P /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2013/AD0253.wav


Output()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
file_name: AD0253.wav, segment_id: segment_0.wav, turn.start: 22.79534375, turn.end: 28.81971875, duration: 6.024374999999999, speaker: SPEAKER_00, gender: male, male_prob: 0.930648922920227, female_prob: 0.06935107707977295
Excel file path: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2013/AD0253.xlsx
Writing to /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2013/AD0253.xlsx completed!
files ['AD0301.wav']
files ['AD0301.wav']
P /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2014/AD0301.wav


Output()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11m

### Total speaking time (Conclusion)

In [264]:
import os
import pandas as pd
from pydub import AudioSegment
# Iterate through each folder from ADs_IG_2013_wav to ADs_IG_2022_wav
years = os.getenv("YEARS").split(" ")

for folder_name in years:
# for folder_name in range(2013, 2023):
    input_path = f"{input_folder}/ADs_IG_{folder_name}"
    output_folder = f"{output_base_folder}/ADs_IG_{folder_name}"
    files = [f for f in os.listdir(input_path) if f.endswith('.wav')]
    files.sort()
    print("files", files)
    # Iterate through each WAV file in the current folder
    for wav_file_name in files:  # Changed from os.listdir(input_path)
        # Load the WAV file using pydub
            wav_file_path = os.path.join(input_path, wav_file_name)
            audio = AudioSegment.from_wav(wav_file_path)
            # Get the total duration in seconds
            total_duration = len(audio) / 1000.0  # Convert milliseconds to seconds

            print("Processing:", wav_file_path)
            print("Total Duration:", total_duration, "seconds")

            # Read Excel file into a pandas DataFrame
            excel_file_path = os.path.join(output_base_excel_folder,f"ADs_IG_{folder_name}", f"{os.path.splitext(wav_file_name)[0]}.xlsx")
            # df = pd.read_excel(excel_file_path)
            print("Excel file path:", excel_file_path)
            if os.path.isfile(excel_file_path):
                print("Excel file exists!")
                try:
                    # Read the existing data
                    df = pd.read_excel(excel_file_path, sheet_name='Gender_speaking_time')
                    
                    # Check if DataFrame is empty or missing required columns
                    if df.empty or 'duration' not in df.columns or 'gender' not in df.columns:
                        print(f"Warning: Required columns missing in {excel_file_path}")
                        continue
                        
                    # Calculate total speaking time in seconds
                    total_speaking_time = df['duration'].sum()
                    
                    # Calculate the percentage of total speaking time against the total duration
                    percentage_of_total_duration = total_speaking_time / total_duration * 100
                    
                    # Calculate percentages
                    df['Value_Percent'] = df['duration'] / total_speaking_time * 100
                    
                    # Create results DataFrame
                    results_df = pd.DataFrame({
                        'Metric': ['Total Speaking Time', 'Male Speaking Time', 'Female Speaking Time'],
                        'Value_Seconds': [
                            total_speaking_time, 
                            df[df['gender'] == 'male']['duration'].sum(), 
                            df[df['gender'] == 'female']['duration'].sum()
                        ],
                        'Value_Percent': [
                            percentage_of_total_duration, 
                            df[df['gender'] == 'male']['Value_Percent'].sum(), 
                            df[df['gender'] == 'female']['Value_Percent'].sum()
                        ]
                    })
                    
                    # Combine data
                    existing_data = pd.read_excel(excel_file_path, sheet_name='Gender_speaking_time')
                    existing_data[''] = ''
                    combined_data = pd.concat([existing_data, results_df], axis=1)
                    
                    # Write back to Excel
                    with pd.ExcelWriter(excel_file_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer:
                        combined_data.to_excel(writer, sheet_name='Gender_speaking_time', index=False)
                        
                except Exception as e:
                    print(f"Error processing {excel_file_path}: {str(e)}")
            else:
                print("Excel file does not exist!")


files ['AD0253.wav']
Processing: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2013/AD0253.wav
Total Duration: 28.8 seconds
Excel file path: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2013/AD0253.xlsx
Excel file exists!
files ['AD0301.wav']
Processing: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2014/AD0301.wav
Total Duration: 56.128 seconds
Excel file path: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2014/AD0301.xlsx
Excel file exists!




---



# *2. Emotion Recognition from Transcription*

WhisperAI transcribes all audio files before distilbert analyzes the emotions


WhisperAI: https://github.com/openai/whisper

distilbert-base-uncased-emotion: https://huggingface.co/bhadresh-savani/distilbert-base-uncased-emotion?text=I+feel+a+bit+let+down

## Whisper AI for Transcription








In [265]:
# if not installed
!pip install -U openai-whisper

!pip install git+https://github.com/openai/whisper.git

!pip install --upgrade --no-deps --force-reinstall git+https://github.com/openai/whisper.git

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-lkk85wuq
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-lkk85wuq
  Resolved https://github.com/openai/whisper.git to commit 173ff7dd1d9fb1c4fddea0d41d704cfefeb8908c
  Installing build dependencies ... [?25l^C
[?25hcanceled
[31mERROR: Operation cancelled by user[0m[31m
[0m

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-j7ib_4mk
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-j7ib_4mk
  Resolved https://github.com/openai/whisper.git to commit 173ff7dd1d9fb1c4fddea0d41d704cfefeb8908c
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hBuilding wheels for collected packages: openai-whisper
  Building wheel for openai-whisper (pyproject.toml) ... [?25ldone
[?25h  Created wheel for openai-whisper: filename=openai_whisper-20240930-py3-none-any.whl size=803557 sha256=0b5b6f49af049de278f3e1c201f5b228fee3e347a8edda57b27fdf6050188d63
  Stored in directory: /tmp/pip-ephem-wheel-cache-b0t2m4hb/wheels/c3/03/25/5e0ba78bc27a3a089f137c9f1d92fdfce16d06996c071a016c
Successfully built openai-whisper
Installing collec

In [266]:
# if not installed
!pip install setuptools-rust

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




In [267]:
# Optional
# from google.colab import drive
# drive.mount('/content/drive')

### Main

---

In [268]:
import os
import whisper
import torch

# Input
input_folder_path = os.getenv("ADS_DIR")

# Set model to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = whisper.load_model("large", device=device)

# Iterate through each folder
for folder_name in os.listdir(input_folder_path):
    folder_path = os.path.join(input_folder_path, folder_name)

    # Iterate to find .wav files
    if os.path.isdir(folder_path):
        for file_name in os.listdir(folder_path):
            if file_name.endswith(".wav"):
                audio_file_path = os.path.join(folder_path, file_name)

                # Transcription of audio file
                result = model.transcribe(audio_file_path)

                # Outputs here are only TXT files. WhisperAI can also create SRT, VTT, TSV and JSON files. You can change it here.

                # Create output
                output_txt_path = os.path.join(folder_path, f"{file_name.split('.')[0]}.txt")

                # Save Output in .txt file
                with open(output_txt_path, 'w') as txt_file:
                    txt_file.write(result['text'])

                print(f"Transcription for {file_name}: {result['text']}")
                print(f"Output will be saved in {output_txt_path} .")


  checkpoint = torch.load(fp, map_location=device)


Transcription for AD0253.wav:  New Axe Apollo. Enter before midnight at axeapollo.com for your chance to go to space.
Output will be saved in /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2013/AD0253.txt .




Transcription for AD0301.wav:  There's nothing stronger or more powerful than a dream. Even in darkness, dreams have an infinite capacity to see light. They outlive pain. They outlast heartbreak. They overcome doubt. They forget imperfection. They cannot be beaten, broken, or taken. Dreams fight. They persevere. They go on. Because determination will always trump disappointment. Desire will always triumph over defeat. And dreams, dreams will always prevail. Transcription by ESO. Translation by —
Output will be saved in /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2014/AD0301.txt .



## Emotion Recognition (1)

### Set up

In [269]:
from transformers import pipeline
import pandas as pd
import os

In [270]:
# If not installed:
#!pip install transformers
#!pip install xlsxwriter

### Main

In [271]:
# Define emotionclassifier

from transformers import pipeline

# Create a text classification pipeline object
classifier = pipeline("text-classification", model='bhadresh-savani/distilbert-base-uncased-emotion', top_k=None)

def classify_emotion_from_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()

    # Split the text into words
    words = content.split()

    def classify_and_check(predictions, text):
        if any(emotion['score'] > 0.8 for emotion in predictions[0]):
            return {'Emotion': predictions[0][0]['label'], 'Probability': predictions[0][0]['score']}
        else:
            return {'Emotion': 'neutral', 'Probability': 1.0}

    # Due to the model's limitation in analyzing coherent texts with more than 657 tokens (approximately 320 words),
    # such lengthy texts will be split in half and analyzed independently. In our collection of Super Bowl ads from 2013 to 2022,
    # only one ad (AD0290) exceeds this limit. Our AI model classified the first part as 'fear' and the second part as 'joy.'
    # After a manual analysis of both parts, we would categorize the entire ad as 'joy.'

    if len(words) > 320:
        # Split the first row into two parts, each containing half of the words
        half_length = len(words) // 2
        first_row_part1 = ' '.join(words[:half_length])
        first_row_part2 = ' '.join(words[half_length:])

        # Classify emotions for the first part of the first row
        first_row_part1_predictions = classifier(first_row_part1)

        # Append information for the first part of the first row
        emotions_and_scores_part1 = {
            'AD-Number': file_path.split('/')[-1].split('.')[0],
            'Transcription': first_row_part1,
            'Word range': f'1-{half_length}',
            **classify_and_check(first_row_part1_predictions, first_row_part1)
        }

        # Classify emotions for the second part of the first row
        first_row_part2_predictions = classifier(first_row_part2)

        # Append information for the second part of the first row
        emotions_and_scores_part2 = {
            'AD-Number': file_path.split('/')[-1].split('.')[0],
            'Transcription': first_row_part2,
            'Word range': f'{half_length + 1}-{len(words)}',
            **classify_and_check(first_row_part2_predictions, first_row_part2)
        }

        # Combine information for the first row
        emotions_and_scores = [emotions_and_scores_part1, emotions_and_scores_part2]
    else:

        # Classify emotions for the entire content
        full_content_predictions = classifier(content)

        # Append information for the first row (full content)
        emotions_and_scores = [{
            'AD-Number': file_path.split('/')[-1].split('.')[0],
            'Transcription': content,
            'Word range': f'1-{len(words)}',
            **classify_and_check(full_content_predictions, content)
        }]

    # Classify emotions for each 20-word segment starting from the 2nd row with a 5-word shift
    for start in range(0, len(words)-20, 5):
        end = min(start + 20, len(words))
        word_range = f'{start+1}-{end}'  # Adjust to avoid index out of range
        text_segment = ' '.join(words[start:end])

        # Classify emotions for the current segment
        segment_predictions = classifier(text_segment)

        # Append information for each segment
        emotions_and_scores.append({
            'AD-Number': file_path.split('/')[-1].split('.')[0],
            'Transcription': text_segment,
            'Word range': word_range,
            **classify_and_check(segment_predictions, text_segment)
        })


    # Check for missing words
    remaining_start = max(len(words) - 20, 0)  # Startpunkt für die letzten 20 Wörter
    if remaining_start < len(words):
        remaining_word_range = f'{remaining_start + 1}-{len(words)}'
        remaining_text_segment = ' '.join(words[remaining_start:])
        remaining_predictions = classifier(remaining_text_segment)

        # Verwendung der classify_and_check-Funktion für die Emotionsklassifikation
        remaining_emotion_info = classify_and_check(remaining_predictions, remaining_text_segment)

        # Append information for remaining words directly
        emotions_and_scores.append({
          'AD-Number': file_path.split('/')[-1].split('.')[0],
          'Transcription': remaining_text_segment,
          'Word range': remaining_word_range,
          'Emotion': remaining_emotion_info['Emotion'],
          'Probability': remaining_emotion_info['Probability']
        })


    return emotions_and_scores, emotions_and_scores[0]['AD-Number']


def extract_emotions_and_scores(text, predictions, ad_number):
    # Extract emotions + probabilities and add "Word range" and "Text segment" information
    emotions_and_scores = []

    # Split text into words
    words = text.split()
    segment_size = 20

    for emotion in predictions[0]:
        if emotion['score'] > 0.8:
            for start in range(0, len(words), segment_size):  # Adjust to 20 words per segment
                end = min(start + segment_size, len(words))
                word_range = f'{start + 1}-{end}'  # Adjust to avoid index out of range
                text_segment = ' '.join(words[start:end])

                # Classify emotions for the current segment using the global classifier
                segment_predictions = classifier(text_segment)

                # Only add relevant information for the first row
                if start == 0:
                    emotions_and_scores.append({
                        'AD-Number': ad_number.split('/')[-1].split('.')[0],
                        'Transcription': text,
                        'Word range': f'1-{len(words)}',
                        'Emotion': emotion['label'],
                        'Probability': emotion['score']
                    })

                # Add information for subsequent rows
                emotions_and_scores.append({
                    'AD-Number': ad_number.split('/')[-1].split('.')[0],
                    'Transcription': text_segment,
                    'Word range': word_range,
                    'Emotion': segment_predictions[0][0]['label'],  # Assuming top emotion from the model
                    'Probability': segment_predictions[0][0]['score']

                     })



    return emotions_and_scores


In [272]:
def process_text_file(file_path, output_base_folder):
    predictions, file_name = classify_emotion_from_file(file_path)

    # Create directory for output
    output_folder = os.path.join(output_base_folder, file_name)
    output_folder_name = os.path.basename(output_folder)
    output_folder_name = output_folder_name.replace(".wav", "")
    output_folder = os.path.join(os.path.dirname(output_folder), output_folder_name)

    # Go through all subfolders
    for root, dirs, files in os.walk(output_base_folder):
        for dir_name in dirs:
            if dir_name not in file_path:
                continue
            # Create Excel directory
            excel_file_path = os.path.join(root, dir_name, f"{file_name}.xlsx")
            print(f"excel_file_path: {excel_file_path}")
            try:
                # Try to write to existing file with if_sheet_exists='replace'
                with pd.ExcelWriter(excel_file_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer:
                    result_df_new = pd.DataFrame(predictions, columns=['AD-Number', 'Transcription', 'Word range', 'Emotion', 'Probability'])
                    result_df_new.to_excel(writer, sheet_name="Transcription_and_Mood", index=False)
                    print(f"File {excel_file_path} is updated.")
                    return
            except FileNotFoundError:
                # If file doesn't exist, create new one
                with pd.ExcelWriter(excel_file_path, engine='openpyxl', mode='w') as writer:
                    result_df_new = pd.DataFrame(predictions, columns=['AD-Number', 'Transcription', 'Word range', 'Emotion', 'Probability'])
                    result_df_new.to_excel(writer, sheet_name="Transcription_and_Mood", index=False)
                    print(f"File {excel_file_path} is created.")
                    break

### Input and Output

In [273]:
# Input
# Loop through all files in the folder
input_folder_path = os.getenv("ADS_DIR")
output_base_folder = os.getenv("OUTPUT_BILD_PLUS_TON_LISTS_DIR")
for root, dirs, files in os.walk(input_folder_path):
    for file_name in files:
        if file_name.endswith(".txt"):
            file_path = os.path.join(root, file_name)
            process_text_file(file_path, output_base_folder)


excel_file_path: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2013/AD0253.xlsx
File /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2013/AD0253.xlsx is updated.
excel_file_path: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2014/AD0301.xlsx
File /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2014/AD0301.xlsx is updated.




---



## Combination of emotion from image and audio (2)




## Emotion from WhisperAI Intervalls

### Set Up

In [274]:
import pandas as pd
import json
import os
from transformers import pipeline
from openpyxl import load_workbook
from openpyxl.utils.dataframe import dataframe_to_rows

In [218]:
# Optional
# from google.colab import drive
# drive.mount('/content/drive')

### Functions

In [275]:
# Funktion zum Emotionsklassifikation und Überprüfung
def classify_and_check(predictions):
    if any(emotion['score'] > 0.8 for emotion in predictions[0]):
        return {'Emotion': predictions[0][0]['label'], 'Probability': predictions[0][0]['score']}
    else:
        return {'Emotion': 'neutral', 'Probability': 1.0}

# Funktion zum Emotionsklassifikation
def classify_emotion(text):
    classifier = pipeline("text-classification", model='bhadresh-savani/distilbert-base-uncased-emotion', top_k=None)
    result = classifier(text)
    return result

In [276]:
def process_json_file(json_file_path, excel_file_path):
    # Create empty list
    all_excel_data = []
    print(f"excel_file_path: {excel_file_path}")

    # Existing table
    existing_df = pd.read_excel(excel_file_path, sheet_name='Transcription_and_Mood')

    # load all JSON data
    with open(json_file_path) as f:
        data = json.load(f)

    # extract data name
    ad_number = os.path.splitext(os.path.basename(json_file_path))[0]

    # extract whole text
    full_text = data.get("text", "")

    # extract relevant information
    data_list = data.get("segments", [])

    # loop through all segments
    for segment in data_list:
        row = {
            'AD-Number': ad_number,
            'ID-Number': segment.get("id", ""),
            'Start': segment.get("start", ""),
            'End': segment.get("end", ""),
            'Transcription': segment.get("text", "")
        }

        # classify emotion
        emotion_result = classify_emotion(row['Transcription'])
        row.update(classify_and_check(emotion_result))

        all_excel_data.append(row)

    # add first row
    first_row = {
        'AD-Number': ad_number,
        'ID-Number': '',
        'Start': '',
        'End': '',
        'Transcription': full_text
    }

    if len(full_text) > 320:
      # Copy the emotion from the first table
      first_row.update({'Emotion': existing_df.at[0, 'Emotion'], 'Probability': 1.0})

    else:
      # classify emotion + add results
      emotion_result_first_row = classify_emotion(first_row['Transcription'])
      first_row.update(classify_and_check(emotion_result_first_row))

    all_excel_data.insert(0, first_row)

    # Create a new DataFrame with your new data
    new_data_df = pd.DataFrame(all_excel_data)

    # Insert the new DataFrame
    existing_df[' '] = ''
    existing_df = pd.concat([existing_df, new_data_df], axis=1)

    # Load the existing workbook using openpyxl
    workbook = load_workbook(excel_file_path)

    # get the 'Transcription_and_Mood' sheet
    sheet = workbook['Transcription_and_Mood']

    # Write the updated DataFrame to the Excel sheet
    for r_idx, row in enumerate(dataframe_to_rows(existing_df, index=False, header=True), 1):
        for c_idx, value in enumerate(row, 1):
            sheet.cell(row=r_idx, column=c_idx, value=value)

    # Save the updated workbook
    workbook.save(excel_file_path)

### Run Code

In [277]:
# Path to the folder containing the frames and the excel lists

json_file_folder_path = f'{os.getenv("TONANALYSE_DIR")}/JSON_Dateien'
os.makedirs(json_file_folder_path, exist_ok=True)
excel_file_folder_path = os.getenv("OUTPUT_BILD_PLUS_TON_LISTS_DIR")


In [278]:
# OPTIONAL: So that the loops starts with the years in an alphabetical order
years = []
for year in os.listdir(excel_file_folder_path):
  years.append(year)
years.sort()

In [279]:
for year in years:
    # Set paths
    json_files_year_path = os.path.join(json_file_folder_path, f'{year}_json')
    excel_file_year_path = os.path.join(excel_file_folder_path, year)
    
    # Create JSON directory if it doesn't exist
    os.makedirs(json_files_year_path, exist_ok=True)
    
    # Create sets of base file names
    json_files_year_set = {os.path.splitext(file)[0] for file in os.listdir(json_files_year_path)}
    excel_files_year_set = {os.path.splitext(file)[0] for file in os.listdir(excel_file_year_path)}
        # Find common base names
    common_base_names = json_files_year_set.intersection(excel_files_year_set)

    # Iterate over common base names
    for base_name in common_base_names:
        json_file_path = os.path.join(json_files_year_path, f'{base_name}.json')
        excel_file_path = os.path.join(excel_file_year_path, f'{base_name}.xlsx')
        try:
          process_json_file(json_file_path, excel_file_path)
        except:
          print(f"fail: {base_name}")

## Analysis Emotion Image & Audio

### Set Up

In [280]:
!pip install XlsxWriter

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




In [281]:
import pandas as pd
import re
from collections import Counter
import os
from openpyxl import load_workbook
from openpyxl.utils.dataframe import dataframe_to_rows
import math
import xlsxwriter

In [226]:
# Optional
# from google.colab import drive
# drive.mount('/content/drive')

In [282]:
# Create the mapping dictionary for frames to seconds
mapping_dict = {}
for i in range(0, 4800, 10):
    new_number = ((i // 10) // 3) + 1
    mapping_dict[i] = new_number

In [283]:
# Create the mapping for the emotions from text to image
emotion_mapping_dict = {
    'joy': 'happy',
    'love': 'happy',
    'neutral': 'neutral',
    'anger': 'angry',
    'surprise': 'surprise',
    'fear': 'fear',
    'sadness': 'sad'
}

### Find corresponding Frame_Nr for the second

In [284]:
def comparison_emotions_image_audio(excel_file_path):
  # Read the Sheets
  emotion_image = pd.read_excel(f'{excel_file_path}', sheet_name='Predictions')
  emotion_audio = pd.read_excel(f'{excel_file_path}', sheet_name='Transcription_and_Mood')

  # Create new columns
  emotion_audio['Frame_Nr'] = ''
  emotion_audio['Emotions'] = ''
  emotion_audio['Dominant_Emotion'] = ''
  emotion_audio['Correct_%'] = ''
  emotion_audio['Equal_Emotions'] = ''

  for index, start_second in emotion_audio['Start'].items():
    # Check if start_second is not NaN
    if pd.notna(start_second):
        # Define the seconds in the interval
        end_second = emotion_audio.at[index, 'End']
        seconds_interval = list(range(int(start_second)+1, int(end_second)+1))

        # Create a list of corresponding frames
        frames = []
        for second in seconds_interval:
            for key, value in mapping_dict.items():
                if value == round(second):
                    frames.append(key)
        # Save the corresponding frames to the dataframe
        emotion_audio.at[index, 'Frame_Nr'] = frames

  # Find corresponding emotion
  for index_audio, frame_audio in emotion_audio['Frame_Nr'].items():
    corresponding_emotions = []
    if str(frame_audio) != '':
      # Iterate for each frame_nr
      for frame_nr_audio in frame_audio:
        for index_image, frame_video in emotion_image['video_frame'].items():
          # Find the corresponding frame_nr from the image analysis
          frame_nr_video = frame_video[13:]
          frame_nr_video = frame_nr_video.split('.')[0]
          if str(frame_nr_audio) == str(frame_nr_video):
            # Find the corresponding emotion
            corresponding_emotion = emotion_image.at[index_image, 'emotion_prediction']
            if corresponding_emotion != '-':
              corresponding_emotions.append(corresponding_emotion)
      # Save the identified emotions
      emotion_audio.at[index_audio, 'Emotions'] = corresponding_emotions

  # Find dominant emotion
  for index_audio, emotions_image in emotion_audio['Emotions'].items():
    if len(emotions_image) > 0:
      dominant_emotion = Counter(emotions_image).most_common(1)[0][0]
      emotion_audio.at[index_audio, 'Dominant_Emotion'] = dominant_emotion

  # Find Correct % and if both Emotions are equal
  for index_audio, total_emotion_audio in emotion_audio['Emotion.1'].items():
    if str(total_emotion_audio) != 'nan':
      transformed_emotion = emotion_mapping_dict[str(total_emotion_audio)]
      emotions_from_image = emotion_audio.at[index_audio, 'Emotions']
      correct_emotion_count = emotions_from_image.count(transformed_emotion)
      total_emotions = len(emotions_from_image)
      if total_emotions != 0:
        emotion_audio.at[index_audio, 'Correct_%'] = (correct_emotion_count/ total_emotions)
      else:
        emotion_audio.at[index_audio, 'Correct_%'] = 0

      dominant_emotion_image = emotion_audio.at[index_audio, 'Dominant_Emotion']
      if  transformed_emotion == dominant_emotion_image:
        emotion_audio.at[index_audio, 'Equal_Emotions'] = 1
      else:
        emotion_audio.at[index_audio, 'Equal_Emotions'] = 0

  # Insert summary information
  emotion_audio['   '] = ''
  emotion_audio['Average_Correct_%'] = ''
  emotion_audio['Average_Equal_Emotions'] = ''
  if len(list(emotion_audio['Emotion.1'].items())) > 0:
      emotion_audio['Correct_%'] = pd.to_numeric(emotion_audio['Correct_%'], errors='coerce')
      emotion_audio['Equal_Emotions'] = pd.to_numeric(emotion_audio['Equal_Emotions'], errors='coerce')
      emotion_audio.at[0, 'Average_Correct_%'] = emotion_audio['Correct_%'].mean()
      emotion_audio.at[0, 'Average_Equal_Emotions'] = emotion_audio['Equal_Emotions'].mean()

  # Read exel file
  excel_sheets = pd.read_excel(excel_file_path, sheet_name=None)

  # Replace "Unnamed" columns with empty strings in all dataframes
  for sheet_name, df in excel_sheets.items():
      df.columns = [col if 'Unnamed' not in str(col) else '' for col in df.columns]

  # Update the 'Transcription_and_Mood' sheet in the dictionary
  excel_sheets['Transcription_and_Mood'] = emotion_audio

  # Save the modified dictionary of dataframes back to the Excel file
  with pd.ExcelWriter(excel_file_path, engine='xlsxwriter') as writer:
      for sheet_name, df in excel_sheets.items():
          df.to_excel(writer, sheet_name=sheet_name, index=False)

### Run

In [285]:
excel_file_folder_path = os.getenv("OUTPUT_BILD_PLUS_TON_LISTS_DIR")

In [286]:
# OPTIONAL: So that the loops starts with the years in an alphabetical order
years = []
for year in os.listdir(excel_file_folder_path):
  years.append(year)
years.sort()

In [287]:
for year in years:
    # Set path
    excel_file_year_path = os.path.join(excel_file_folder_path, year)

    # Create list of files
    excel_files = os.listdir(excel_file_year_path)

    # Iterate over files
    for excel_file in excel_files:
      excel_file_path = os.path.join(excel_file_year_path, excel_file)

      try:
        comparison_emotions_image_audio(excel_file_path)
      except:
        print(excel_file)

AD0253.xlsx
AD0253
AD0301
AD0301.xlsx


# *3. Acoustic Indices (1.0.1)*




This module aims to extract audio features of the given ads (audio-files)

Acoustic Indices: https://github.com/patriceguyot/Acoustic_Indices

Pydub: https://github.com/jiaaro/pydub

### Set up

In [233]:
#!/usr/bin/env python

In [288]:
!pip install numpy
!pip install scipy
!pip install matplotlib
!pip install pyyaml

!pip install librosa
!pip install python_speech_features

!pip install pydub

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




In [289]:
import yaml
from scipy import signal
from csv import writer
import argparse
import os

import cv2
import librosa
import librosa.display
from python_speech_features import mfcc
import wave
import audioop
import matplotlib.pyplot as plt
import numpy as np

# compression rate
from pydub import AudioSegment

# Excel Export
import pandas as pd
from openpyxl.styles import Font

In [236]:
  # Optional
# from google.colab import drive
# drive.mount('/content/drive')

### Copy & import relevant files

In [290]:
import shutil

# replace window='hanning' with window='hann' in the config_014_butter.yaml file
with open(F"{os.getenv("TONANALYSE_ACOUSTIC_INDICES_QUELLCODE_DIR")}/yaml/config_014_butter.yaml", 'r') as file:
    content = file.read()
content = content.replace('hanning', 'hann')
with open(F"{os.getenv("TONANALYSE_ACOUSTIC_INDICES_QUELLCODE_DIR")}/yaml/config_014_butter.yaml", 'w') as file:
    file.write(content)


shutil.copy(F"{os.getenv("TONANALYSE_ACOUSTIC_INDICES_QUELLCODE_DIR")}/compute_indice.py", ".")
shutil.copy(F"{os.getenv("TONANALYSE_ACOUSTIC_INDICES_QUELLCODE_DIR")}/acoustic_index.py", ".")
shutil.copy(F"{os.getenv("TONANALYSE_ACOUSTIC_INDICES_QUELLCODE_DIR")}/yaml/config_014_butter.yaml", ".")

config_file = os.path.join("./config_014_butter.yaml")

audio_dir = os.getenv("ADS_DIR")
output_dir = os.getenv("OUTPUT_BILD_PLUS_TON_LISTS_DIR")



In [291]:
# import sys
# sys.path.append('/content/drive/MyDrive/SuperBowl_Project_FUB/Colab_Notebooks/Acoustic_Indices')

from compute_indice import *
from acoustic_index import *

### Run the Code

In [292]:
single_output_file = False
config_file = os.path.join(os.getenv("TONANALYSE_ACOUSTIC_INDICES_QUELLCODE_DIR"), "yaml", "config_014_butter.yaml")
audio_dir = os.getenv("ADS_DIR")
output_dir = os.getenv("OUTPUT_BILD_PLUS_TON_LISTS_DIR")


In [293]:
# Set config file
yml_file = config_file
print("Config file: ", yml_file)
with open(yml_file, 'r') as stream:
    data_config = yaml.load(stream, Loader=yaml.FullLoader)

Config file:  /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/02. Tonanalyse/Acoustic_Indices/00 Quellcode/yaml/config_014_butter.yaml


In [294]:
# Get audio files
all_audio_file_path = []
for path, subdirs, files in os.walk(audio_dir):
    for name in files:
        if name.endswith(".wav") and not name.startswith("."):
            all_audio_file_path.append(os.path.join(path, name))

all_audio_file_path = sorted(all_audio_file_path)

print("-", len(all_audio_file_path), "files found in the directory", audio_dir, ':\n')

- 2 files found in the directory /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs :



### In case the ad list needs to be modified, use the list below.


In [242]:
#all_audio_file_path = []
#print(all_audio_file_path)

In [243]:
# Initialize an empty DataFrame for all ads
all_data = pd.DataFrame()

### additional values

In [295]:
def calculate_additional_values(y):
    duration = librosa.get_duration(y=y)
    tempo, _ = librosa.beat.beat_track(y=y)
    db_values = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
    avg_db = np.mean(db_values)
    min_db = np.min(db_values)
    max_db = np.max(db_values)
    max_db_value = np.max(db_values)
    return duration, tempo, avg_db, min_db, max_db, max_db_value

In [296]:
def compress_wav_to_mp3(input_wav_path, output_mp3_path, bitrate='192k'):
    audio = AudioSegment.from_wav(input_wav_path)
    audio.export(output_mp3_path, format='mp3', bitrate=bitrate)

def measure_compression_ratio(original_size, compressed_size, original_duration):
    compression_ratio = ((original_size - compressed_size) / original_size) * 100
    return compression_ratio

def measure_compression_ratio_per_second(compression_ratio, original_duration):
    compression_ratio_per_second = compression_ratio / original_duration
    return compression_ratio_per_second

def delete_file(file_path):
    """
    Delete a file if it exists.

    Parameters:
    - file_path: Path to the file to be deleted.
    """
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f"File {file_path} deleted.")
    else:
        print(f"File {file_path} does not exist.")

### Main

In [297]:
#parser = argparse.ArgumentParser()
#parser.add_argument("config_file", help='yaml config file', nargs='?', const='yaml/config_014_butter.yaml', default='yaml/config_014_butter.yaml', type=str)
#parser.add_argument("audio_dir", help='audio directory', nargs='?', const='audio_files', default='audio_files', type=str)
#parser.add_argument("output_csv_file", help='output csv file', nargs='?', const='dict_all.csv', default='dict_all.csv', type=str)
#args =parser.parse_args()

if single_output_file:

  print("audio directory: ", audio_dir)
  print("output_excel_file: ", output_excel_file)

  for idx_file, filename in enumerate(all_audio_file_path):

      print(f'###### CURRENT AD: {filename} ######')
      print(f'###### - {all_audio_file_path.index(filename)} / {len(all_audio_file_path)} - ######')

      # Read signal -------------------------------------
      file = AudioFile(filename, verbose=True)

      # Pre-processing -----------------------------------------------------------------------------------
      if 'Filtering' in data_config:
          if data_config['Filtering']['type'] == 'butterworth':
              print('- Pre-processing - High-Pass Filtering:', data_config['Filtering'])
              freq_filter = data_config['Filtering']['frequency']
              Wn = freq_filter/float(file.niquist)
              order = data_config['Filtering']['order']
              [b,a] = signal.butter(order, Wn, btype='highpass')
              # to plot the frequency response
              #w, h = signal.freqz(b, a, worN=2000)
              #plt.plot((file.sr * 0.5 / np.pi) * w, abs(h))
              #plt.show()
              file.process_filtering(signal.filtfilt(b, a, file.sig_float))
          elif data_config['Filtering']['type'] == 'windowed_sinc':
              print('- Pre-processing - High-Pass Filtering:', data_config['Filtering'])
              freq_filter = data_config['Filtering']['frequency']
              fc = freq_filter / float(file.sr)
              roll_off = data_config['Filtering']['roll_off']
              b = roll_off / float(file.sr)
              N = int(np.ceil((4 / b)))
              if not N % 2: N += 1  # Make sure that N is odd.
              n = np.arange(N)
              # Compute a low-pass filter.
              h = np.sinc(2 * fc * (n - (N - 1) / 2.))
              w = np.blackman(N)
              h = h * w
              h = h / np.sum(h)
              # Create a high-pass filter from the low-pass filter through spectral inversion.
              h = -h
              h[(N - 1) / 2] += 1
              file.process_filtering(np.convolve(file.sig_float, h))

      # Compute Indices -----------------------------------------------------------------------------------
      print('- Compute Indices')
      ci = data_config['Indices']  # use to simplify the notation
      for index_name in ci:  # iterate over the index names (key of dictionary in the yml file)

          if index_name == 'Acoustic_Complexity_Index':
              print('\tCompute', index_name)
              spectro, _ = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              j_bin = int(ci[index_name]['arguments']['j_bin'] * file.sr / ci[index_name]['spectro']['windowHop'])  # transform j_bin in samples
              main_value, temporal_values = methodToCall(spectro, j_bin)
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values, main_value=main_value)

          elif index_name == 'Acoustic_Diversity_Index':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro, _ = compute_spectrogram(file, windowLength=windowLength, windowHop=windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized=False)
              main_value = methodToCall(spectro, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Acoustic_Evenness_Index':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro, _ = compute_spectrogram(file, windowLength=windowLength, windowHop=windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized=False)
              main_value = methodToCall(spectro, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Bio_acoustic_Index':
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro, frequencies, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Normalized_Difference_Sound_Index':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'RMS_energy':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              temporal_values = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values)

          elif index_name == 'Spectral_centroid':
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              temporal_values = methodToCall(spectro, frequencies)
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values)

          elif index_name == 'Spectral_Entropy':
              print('\tCompute', index_name)
              spectro, _ = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro)
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Temporal_Entropy':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'ZCR':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              temporal_values = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values)

          elif index_name == 'Wave_SNR':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              values = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, values=values)

          elif index_name == 'NB_peaks':
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro, frequencies, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Acoustic_Diversity_Index_NR': # Acoustic_Diversity_Index with Noise Removed spectrograms
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro,_ = compute_spectrogram(file, windowLength=windowLength, windowHop= windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized= False )
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              main_value = methodToCall(spectro_noise_removed, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Acoustic_Evenness_Index_NR': # Acoustic_Evenness_Index with Noise Removed spectrograms
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro,_ = compute_spectrogram(file, windowLength=windowLength, windowHop= windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized= False )
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              main_value = methodToCall(spectro_noise_removed, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Bio_acoustic_Index_NR': # Bio_acoustic_Index with Noise Removed spectrograms
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro_noise_removed, frequencies, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Spectral_Entropy_NR': # Spectral_Entropy with Noise Removed spectrograms
              print('\tCompute', index_name)
              spectro, _ = compute_spectrogram(file, **ci[index_name]['spectro'])
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro_noise_removed)
              file.indices[index_name] = Index(index_name, main_value=main_value)

      # Output Indices -----------------------------------------------------------------------------------
      #if idx_file == 0: # wenn ertse Datei
      #    with open(output_csv_file, 'w') as f_object:
      #        writer_object = writer(f_object)
      #        keys = ['filename']
      #        values = [file.file_name]
      #        for idx, current_index in file.indices.items():
      #            for key, value in current_index.__dict__.items():
      #                if key != 'name':
      #                    keys.append(idx + '__' + key)
      #                    values.append(value)
      #        writer_object.writerow(keys)
      #        writer_object.writerow(values)
      #        f_object.close()
      #else: # alles nach der ersten Datei
      #    with open(output_csv_file, 'a') as f_object:
      #        writer_object = writer(f_object)
      #        values = [file.file_name]
      #        for idx, current_index in file.indices.items():
      #            for key, value in current_index.__dict__.items():
      #                if key != 'name':
      #                    values.append(value)
      #        writer_object.writerow(values)
      #        f_object.close()
      #print("\n")
      # Create a dictionary to store data for the current file

      file_data = {'filename': file.file_name}

      for idx, current_index in file.indices.items():
          for key, value in current_index.__dict__.items():
              if key != 'name':
                  file_data[idx + '__' + key] = value

      # Calculate additional values
      additional_values = calculate_additional_values(file.sig_float)

      # compression rate
      original_size = os.path.getsize(filename)
      audio = AudioSegment.from_wav(filename)
      original_duration = audio.duration_seconds
      mp3_path = os.path.join(filename.replace(".wav", ".mp3"))
      compress_wav_to_mp3(filename, mp3_path)
      compressed_size = os.path.getsize(mp3_path)
      compression_ratio = measure_compression_ratio(original_size, compressed_size, original_duration)
      compression_ratio_per_second = measure_compression_ratio_per_second(compression_ratio, original_duration)
      delete_file(mp3_path)

      # Append additional values to the file_data dictionary
      duration, tempo, avg_db, min_db, max_db, max_db_value = additional_values
      file_data['duration'] = duration
      file_data['tempo'] = tempo
      file_data['avg_db'] = avg_db
      file_data['min_db'] = min_db
      file_data['max_db'] = max_db
      file_data['max_db_value'] = max_db_value
      file_data['compression_ratio'] = compression_ratio
      file_data['compression_ratio_per_second'] = compression_ratio_per_second

      # Append the data for the current file to the DataFrame
      all_data = all_data.append(file_data, ignore_index=True)

  #To Excel
  with pd.ExcelWriter(output_excel_file, engine='openpyxl') as writer:
    all_data.to_excel(writer, sheet_name='Acoustic_Indices', index=False)

In [298]:
if not single_output_file:

  print("audio directory: ", audio_dir)
  print("output_directory: ", output_dir)

  for idx_file, filename in enumerate(all_audio_file_path):

      ad_name = filename.split('/')[-1]
      year_folder_name = filename.split('/')[-2]
      #print(ad_name, year_folder_name)

      print(f'###### CURRENT AD: {filename} ######')
      print(f'###### - {all_audio_file_path.index(filename)} / {len(all_audio_file_path)} - ######')

      # Initialize an empty DataFrame for each individual ad
      all_data = pd.DataFrame()

      # Read signal -------------------------------------
      file = AudioFile(filename, verbose=True)

      # Pre-processing -----------------------------------------------------------------------------------
      if 'Filtering' in data_config:
          if data_config['Filtering']['type'] == 'butterworth':
              print('- Pre-processing - High-Pass Filtering:', data_config['Filtering'])
              freq_filter = data_config['Filtering']['frequency']
              Wn = freq_filter/float(file.niquist)
              order = data_config['Filtering']['order']
              [b,a] = signal.butter(order, Wn, btype='highpass')
              # to plot the frequency response
              #w, h = signal.freqz(b, a, worN=2000)
              #plt.plot((file.sr * 0.5 / np.pi) * w, abs(h))
              #plt.show()
              file.process_filtering(signal.filtfilt(b, a, file.sig_float))
          elif data_config['Filtering']['type'] == 'windowed_sinc':
              print('- Pre-processing - High-Pass Filtering:', data_config['Filtering'])
              freq_filter = data_config['Filtering']['frequency']
              fc = freq_filter / float(file.sr)
              roll_off = data_config['Filtering']['roll_off']
              b = roll_off / float(file.sr)
              N = int(np.ceil((4 / b)))
              if not N % 2: N += 1  # Make sure that N is odd.
              n = np.arange(N)
              # Compute a low-pass filter.
              h = np.sinc(2 * fc * (n - (N - 1) / 2.))
              w = np.blackman(N)
              h = h * w
              h = h / np.sum(h)
              # Create a high-pass filter from the low-pass filter through spectral inversion.
              h = -h
              h[(N - 1) / 2] += 1
              file.process_filtering(np.convolve(file.sig_float, h))

      # Compute Indices -----------------------------------------------------------------------------------
      print('- Compute Indices')
      ci = data_config['Indices']  # use to simplify the notation
      for index_name in ci:  # iterate over the index names (key of dictionary in the yml file)

          if index_name == 'Acoustic_Complexity_Index':
              print('\tCompute', index_name)
              spectro, _ = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              j_bin = int(ci[index_name]['arguments']['j_bin'] * file.sr / ci[index_name]['spectro']['windowHop'])  # transform j_bin in samples
              main_value, temporal_values = methodToCall(spectro, j_bin)
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values, main_value=main_value)

          elif index_name == 'Acoustic_Diversity_Index':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro, _ = compute_spectrogram(file, windowLength=windowLength, windowHop=windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized=False)
              main_value = methodToCall(spectro, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Acoustic_Evenness_Index':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro, _ = compute_spectrogram(file, windowLength=windowLength, windowHop=windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized=False)
              main_value = methodToCall(spectro, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Bio_acoustic_Index':
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro, frequencies, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Normalized_Difference_Sound_Index':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'RMS_energy':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              temporal_values = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values)

          elif index_name == 'Spectral_centroid':
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              temporal_values = methodToCall(spectro, frequencies)
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values)

          elif index_name == 'Spectral_Entropy':
              print('\tCompute', index_name)
              spectro, _ = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro)
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Temporal_Entropy':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'ZCR':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              temporal_values = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, temporal_values=temporal_values)

          elif index_name == 'Wave_SNR':
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              values = methodToCall(file, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, values=values)

          elif index_name == 'NB_peaks':
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro, frequencies, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Acoustic_Diversity_Index_NR': # Acoustic_Diversity_Index with Noise Removed spectrograms
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro,_ = compute_spectrogram(file, windowLength=windowLength, windowHop= windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized= False )
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              main_value = methodToCall(spectro_noise_removed, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Acoustic_Evenness_Index_NR': # Acoustic_Evenness_Index with Noise Removed spectrograms
              print('\tCompute', index_name)
              methodToCall = globals().get(ci[index_name]['function'])
              freq_band_Hz = ci[index_name]['arguments']['max_freq'] / ci[index_name]['arguments']['freq_step']
              windowLength = int(file.sr / freq_band_Hz)
              spectro,_ = compute_spectrogram(file, windowLength=windowLength, windowHop= windowLength, scale_audio=True, square=False, windowType='hamming', centered=False, normalized= False )
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              main_value = methodToCall(spectro_noise_removed, freq_band_Hz, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Bio_acoustic_Index_NR': # Bio_acoustic_Index with Noise Removed spectrograms
              print('\tCompute', index_name)
              spectro, frequencies = compute_spectrogram(file, **ci[index_name]['spectro'])
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro_noise_removed, frequencies, **ci[index_name]['arguments'])
              file.indices[index_name] = Index(index_name, main_value=main_value)

          elif index_name == 'Spectral_Entropy_NR': # Spectral_Entropy with Noise Removed spectrograms
              print('\tCompute', index_name)
              spectro, _ = compute_spectrogram(file, **ci[index_name]['spectro'])
              spectro_noise_removed = remove_noiseInSpectro(spectro, **ci[index_name]['remove_noiseInSpectro'])
              methodToCall = globals().get(ci[index_name]['function'])
              main_value = methodToCall(spectro_noise_removed)
              file.indices[index_name] = Index(index_name, main_value=main_value)

      # Output Indices -----------------------------------------------------------------------------------
      #if idx_file == 0: # wenn ertse Datei
      #    with open(output_csv_file, 'w') as f_object:
      #        writer_object = writer(f_object)
      #        keys = ['filename']
      #        values = [file.file_name]
      #        for idx, current_index in file.indices.items():
      #            for key, value in current_index.__dict__.items():
      #                if key != 'name':
      #                    keys.append(idx + '__' + key)
      #                    values.append(value)
      #        writer_object.writerow(keys)
      #        writer_object.writerow(values)
      #        f_object.close()
      #else: # alles nach der ersten Datei
      #    with open(output_csv_file, 'a') as f_object:
      #        writer_object = writer(f_object)
      #        values = [file.file_name]
      #        for idx, current_index in file.indices.items():
      #            for key, value in current_index.__dict__.items():
      #                if key != 'name':
      #                    values.append(value)
      #        writer_object.writerow(values)
      #        f_object.close()
      #print("\n")
      # Create a dictionary to store data for the current file

      file_data = {'filename': file.file_name}

      for idx, current_index in file.indices.items():
          for key, value in current_index.__dict__.items():
              if key != 'name':
                  file_data[idx + '__' + key] = value

      # Calculate additional values
      additional_values = calculate_additional_values(file.sig_float)

      # compression rate
      original_size = os.path.getsize(filename)
      audio = AudioSegment.from_wav(filename)
      original_duration = audio.duration_seconds
      mp3_path = os.path.join(filename.replace(".wav", ".mp3"))
      compress_wav_to_mp3(filename, mp3_path)
      compressed_size = os.path.getsize(mp3_path)
      compression_ratio = measure_compression_ratio(original_size, compressed_size, original_duration)
      compression_ratio_per_second = measure_compression_ratio_per_second(compression_ratio, original_duration)
      delete_file(mp3_path)

      # Append additional values to the file_data dictionary
      duration, tempo, avg_db, min_db, max_db, max_db_value = additional_values
      file_data['duration'] = duration
      file_data['tempo'] = tempo
      file_data['avg_db'] = avg_db
      file_data['min_db'] = min_db
      file_data['max_db'] = max_db
      file_data['max_db_value'] = max_db_value
      file_data['compression_ratio'] = compression_ratio
      file_data['compression_ratio_per_second'] = compression_ratio_per_second

      # Append the data for the current file to the DataFrame
      all_data = all_data.append(file_data, ignore_index=True)

      output_xlsx_file = os.path.join(output_dir, year_folder_name[0:11] + "/" + ad_name[0:7] + "xlsx")

      print(output_xlsx_file)

      #To Excel
      #with pd.ExcelWriter(output_xlsx_file, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
      #  all_data.to_excel(writer, sheet_name='Acoustic_Indices', index=False)
      try:
          with pd.ExcelWriter(output_xlsx_file, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
              all_data.to_excel(writer, sheet_name='Acoustic_Indices', index=False)

      except Exception as e:
          print(f"Error: {e}")

          # Versuche, die Datei zu löschen (falls sie existiert)
          if os.path.exists(output_xlsx_file):
              os.remove(output_xlsx_file)
              print(f"File '{output_xlsx_file}' deleted.")

          # Erstelle eine neue leere Datei
          with pd.ExcelWriter(output_xlsx_file, engine='openpyxl', mode='w') as writer:
              all_data.to_excel(writer, sheet_name='Acoustic_Indices', index=False)
              print(f"New file '{output_xlsx_file}' created and written.")

audio directory:  /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs
output_directory:  /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists
###### CURRENT AD: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2013/AD0253.wav ######
###### - 0 / 2 - ######
Read the audio file: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2013/AD0253.wav
	Successful read of the audio file: /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2013/AD0253.wav
	The audio file contains more than one channel. Only the channel 0 will be used.
- Pre-processing - High-Pass Filtering: {'type': 'butterworth', 'order': 8, 'frequency': 300}
- Compute Indices
	Compute Acoustic_Complexity_Index
	Compute Acoustic_Diversity_Index
	Compute Acoustic_Evenness_Index
	Compute Bio_acoustic_Index
	Compute Nor

  all_data = all_data.append(file_data, ignore_index=True)


	Compute Normalized_Difference_Sound_Index
	Compute RMS_energy
	Compute Spectral_centroid
	Compute Spectral_Entropy
	Compute Temporal_Entropy
	Compute ZCR
	Compute Wave_SNR
	Compute NB_peaks
	Compute Acoustic_Diversity_Index_NR
	Compute Acoustic_Evenness_Index_NR
	Compute Bio_acoustic_Index_NR
	Compute Spectral_Entropy_NR
File /home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/ADs/ADs_IG_2014/AD0301.mp3 deleted.
/home/arkastor/Development/Commercial-Brand-Differentiating-Message-Analysis/Final_Files/03. Output Bild + Ton/01. output_lists/ADs_IG_2014/AD0301.xlsx


  all_data = all_data.append(file_data, ignore_index=True)
