In [29]:
import json
import os
import subprocess
import wave
from pydub import AudioSegment
from google.cloud import speech
from pythainlp.util import Trie
from spleeter.separator import Separator
from pythainlp.tokenize import word_tokenize
import gc

In [30]:
# ฟังก์ชั่นสำหรับแปลงไฟล์ .mp3 เป็น .wav
def convert_mp3_to_wav(file_path):
    if file_path.endswith('.mp3'):
        new_file_path = file_path[:-4] + '.wav'  # เปลี่ยนนามสกุลไฟล์

        # ตรวจสอบว่าไฟล์ .wav มีอยู่แล้วหรือไม่
        if not os.path.exists(new_file_path):
            # ใช้ ffmpeg เพื่อแปลงไฟล์
            command = f'ffmpeg -i "{file_path}" -acodec pcm_s16le -ar 44100 "{new_file_path}"' # 16000 44100 22050 48000
            result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            
            if result.returncode != 0:
                print(f"Error converting {file_path} to {new_file_path}:\n{result.stderr.decode('utf-8')}")
                return None
            
            print(f'Converted: {file_path} to {new_file_path}')
        else:
            print(f'File already exists: {new_file_path}')
        return new_file_path
    return file_path

In [31]:
# ฟังก์ชั่นสำหรับแยกเสียงร้องออกจากเพลงโดยใช้ Spleeter
def separate_vocals(input_file):
    output_dir = os.path.dirname(input_file)
    base_name = os.path.splitext(os.path.basename(input_file))[0]
    
    separator = Separator('spleeter:2stems')
    separator.separate_to_file(input_file, output_dir)
    
    vocals_path = os.path.join(output_dir, base_name, 'vocals.wav')
    new_vocals_path = os.path.join(output_dir, f'{base_name}_vocals.wav')
    
    if not os.path.exists(vocals_path):
        print(f"Failed to separate vocals for file: {input_file}")
        return None

    os.rename(vocals_path, new_vocals_path)

    # ลบไฟล์ต้นฉบับและไฟล์ที่ไม่ต้องการ
    accompaniment_path = os.path.join(output_dir, base_name, 'accompaniment.wav')
    os.remove(accompaniment_path)
    os.rmdir(os.path.join(output_dir, base_name))

    print(f"Separated vocals to {new_vocals_path}")
    return new_vocals_path

In [32]:
# ฟังก์ชั่นสำหรับตัดไฟล์ .wav เป็นส่วนๆ
def split_wav_file(file_path, segment_length_ms, output_format, max_segments):
    if not file_path or not os.path.exists(file_path):
        print(f"File {file_path} does not exist.")
        return []
    
    # โหลดไฟล์เสียง
    audio = AudioSegment.from_file(file_path)
    if audio is None:
        print(f"Failed to load audio file: {file_path}")
        return []

    # คำนวณจำนวนไฟล์ที่จะได้รับจากการตัด
    num_segments = min(len(audio) // segment_length_ms, max_segments)
    segment_files = []

    # กำหนดความยาวของหมายเลข segment ตามจำนวน segments ที่มีสูงสุด
    num_length = len(str(num_segments))

    # ตัดและบันทึกแต่ละส่วน
    for i in range(num_segments):
        start_ms = i * segment_length_ms
        end_ms = start_ms + segment_length_ms
        segment = audio[start_ms:end_ms]

        # สร้างชื่อไฟล์สำหรับแต่ละส่วน
        segment_file_name = f"{file_path[:-4]}_segment_{str(i+1).zfill(num_length)}.{output_format}"

        # บันทึกส่วนของไฟล์เสียง
        segment.export(segment_file_name, format=output_format)
        print(f"Exported {segment_file_name}")
        segment_files.append(segment_file_name)

    return segment_files

In [33]:
# ฟังก์ชั่นสำหรับแปลงไฟล์ .wav จาก stereo เป็น mono
def convert_stereo_to_mono(file_path):
    if not file_path or not os.path.exists(file_path):
        print(f"File {file_path} does not exist.")
        return

    if file_path.endswith(".wav"):
        temp_file_path = file_path[:-4] + "_mono.wav"

        # ใช้ ffmpeg เพื่อแปลงไฟล์จาก stereo เป็น mono
        command = ["ffmpeg", "-i", file_path, "-ac", "1", temp_file_path]
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        
        if result.returncode != 0:
            print(f"Error converting {file_path} to mono:\n{result.stderr.decode('utf-8')}")

        # แทนที่ไฟล์เดิมด้วยไฟล์ที่แปลงแล้ว
        os.remove(file_path)
        os.rename(temp_file_path, file_path)

        print(f"Converted {file_path} to mono.")


In [34]:
# ฟังก์ชั่นสำหรับตรวจสอบว่าไฟล์ .wav เป็น mono หรือไม่
def check_mono_conversion(file_path):
    if not file_path or not os.path.exists(file_path):
        print(f"File {file_path} does not exist.")
        return False

    if file_path.endswith(".wav"):
        with wave.open(file_path, 'r') as wav_file:
            channels = wav_file.getnchannels()
            if channels != 1:
                print(f"File {file_path} has {channels} channels. Conversion failed.")
                return False
            else:
                print(f"File {file_path} is mono.")
                return True

In [35]:
# ฟังก์ชั่นหลักสำหรับการประมวลผลไฟล์เดียว
# def process_single_file(file_path):
#     # Step 1: Convert .mp3 to .wav if necessary
#     new_file_path = convert_mp3_to_wav(file_path)

#     if not new_file_path:
#         print("Conversion from mp3 to wav failed.")
#         return

#     # Step 2: Separate vocals using Spleeter
#     vocal_file_path = separate_vocals(new_file_path)

#     # Step 3: Split .wav file
#     segment_length_ms = 30 * 1000  # 30 วินาทีเป็นมิลลิวินาที
#     output_format = 'wav'  # หรือ 'mp3'
#     max_segments = 20  # สร้างไฟล์ตัดแยกได้สูงสุด 20 ไฟล์ต่อไฟล์ต้นฉบับ
#     segment_files = split_wav_file(vocal_file_path, segment_length_ms, output_format, max_segments)

#     # Step 4: Convert stereo .wav files to mono and check conversion
#     for segment_file in segment_files:
#         convert_stereo_to_mono(segment_file)
#         check_mono_conversion(segment_file)

#     # Step 5: Remove original files
#     if file_path.endswith('.mp3') and os.path.exists(file_path):
#         os.remove(file_path)
#         print(f"Original file {file_path} has been removed.")

#     if os.path.exists(new_file_path):
#         os.remove(new_file_path)
#         print(f"Original file {new_file_path} has been removed.")

#     if os.path.exists(vocal_file_path):
#         os.remove(vocal_file_path)
#         print(f"Original file {vocal_file_path} has been removed.") 
def process_single_file(file_path):
    # Step 1: Convert .mp3 to .wav if necessary
    new_file_path = convert_mp3_to_wav(file_path)

    if not new_file_path:
        print("Conversion from mp3 to wav failed.")
        return

    # Step 2: ไม่ต้องแยกเสียงร้องแล้ว

    # Step 3: Split .wav file
    segment_length_ms = 30 * 1000  # 30 วินาทีเป็นมิลลิวินาที
    output_format = 'wav'  # หรือ 'mp3'
    max_segments = 20  # สร้างไฟล์ตัดแยกได้สูงสุด 20 ไฟล์ต่อไฟล์ต้นฉบับ
    segment_files = split_wav_file(new_file_path, segment_length_ms, output_format, max_segments)

    # Step 4: Convert stereo .wav files to mono and check conversion
    for segment_file in segment_files:
        convert_stereo_to_mono(segment_file)
        check_mono_conversion(segment_file)

    # Step 5: Remove original files (ลบเฉพาะไฟล์ mp3 ต้นฉบับ)
    if file_path.endswith('.mp3') and os.path.exists(file_path):
        os.remove(file_path)
        print(f"Original file {file_path} has been removed.")

In [36]:
# ฟังก์ชั่นสำหรับถอดเสียงจากไฟล์เสียงโดยใช้ Google Cloud Speech-to-Text
def transcribe_audio(audio_file_path):
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../you_path_Key.json"
    client = speech.SpeechClient()

    with open(audio_file_path, "rb") as audio_file:
        content = audio_file.read()

    # ตรวจสอบว่าไฟล์เสียงไม่ว่างเปล่า
    if not content:
        print(f"Audio file is empty: {os.path.basename(audio_file_path)}")
        return None

    audio = speech.RecognitionAudio(content=content)
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=44100,  # 16000 44100 22050 48000
        language_code="th-TH",
    )

    try:
        response = client.recognize(config=config, audio=audio)
    except Exception as e:
        print(f"Error transcribing audio file: {os.path.basename(audio_file_path)}\n{str(e)}")
        return None

    transcript = ""
    for result in response.results:
        transcript += result.alternatives[0].transcript

    if not transcript:
        print(f"Failed to transcribe audio file: {os.path.basename(audio_file_path)}")
        return None

    return transcript

In [37]:
# ฟังก์ชั่นสำหรับโหลดคำจากไฟล์ JSON
def load_words(json_file_path):
    if not os.path.exists(json_file_path):
        raise FileNotFoundError(f"The file {json_file_path} does not exist.")
    
    with open(json_file_path, mode='r', encoding='utf-8-sig') as file: #utf-8-sig utf-8
        words = json.load(file)
    return words

In [38]:
# ตรวจสอบการมีอยู่ของไฟล์ JSON
negative_words_path = '../you_path'
positive_words_path = '../you_path'

if not os.path.exists(negative_words_path):
    raise FileNotFoundError(f"{negative_words_path} not found.")

if not os.path.exists(positive_words_path):
    raise FileNotFoundError(f"{positive_words_path} not found.")

# โหลดข้อมูลคำลบและคำบวกจากไฟล์ JSON
negative_words = load_words(negative_words_path)["Negative_word"]
positive_words = load_words(positive_words_path)["Positive_word"]

In [39]:
# ประเมิณความรู้สึก
def evaluate_sentiment(transcript, negative_words, positive_words):
    transcript_words = word_tokenize(transcript, engine="newmm")
    
    # สร้าง bigrams และ trigrams
    bigrams = [''.join(transcript_words[i:i+2]) for i in range(len(transcript_words)-1)]
    trigrams = [''.join(transcript_words[i:i+3]) for i in range(len(transcript_words)-2)]
    
    # รวม unigram, bigram, และ trigram
    combined_words = transcript_words + bigrams + trigrams
    
    negative_count = 0
    positive_count = 0

    # ลบการคำนวณคะแนน และเก็บเพียงจำนวนคำ
    for word in combined_words:
        if word in negative_words:
            negative_count += 1
        elif word in positive_words:
            positive_count += 1

    # ไม่คำนวณ total_score, negative_score, positive_score
    return negative_count, positive_count, combined_words


In [40]:
# ฟังก์ชั่นสำหรับลบไฟล์เสียงทั้งหมดในไดเรกทอรีที่กำหนด
def delete_all_audio_files(directory):
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith('.wav') or file.endswith('.mp3'):  # ลบทั้ง .wav และ .mp3
                file_path = os.path.join(root, file)
                try:
                    os.remove(file_path)
                    print(f"Deleted file: {file_path}")
                except Exception as e:
                    print(f"Failed to delete file: {file_path}. Reason: {str(e)}")

In [41]:
# ฟังก์ชั่นสำหรับประมวลผลไฟล์ .wav ทั้งหมดในไดเรกทอรี
def process_all_wav_files(directory):
    total_negative_count = 0
    total_positive_count = 0
    total_transcripts = []
    total_words_count = 0
    total_transcript = ""  # เพิ่มตัวแปร total_transcript

    for root, _, files in os.walk(directory):  # ไม่ใช้ตัวแปร dirs ดังนั้นใช้ _ แทน
        for file in files:
            if file.endswith('.wav'):
                audio_file_path = os.path.join(root, file)
                print(f"Processing file: {file}")  # พิมพ์ชื่อไฟล์เท่านั้น

                transcript = transcribe_audio(audio_file_path)
                if not transcript:
                    print(f"Skipping file due to transcription failure: {file}")
                    continue

                negative_count, positive_count, combined_words = evaluate_sentiment(
                    transcript, negative_words, positive_words)
                
                total_transcripts.append(transcript)
                total_transcript += transcript + " "  # รวม transcript ของแต่ละไฟล์
                total_negative_count += negative_count
                total_positive_count += positive_count
                total_words_count += len(combined_words)

                print(f"Tokenized Words: {combined_words}")
                print(f"Transcript: {transcript}")
                print(f"Negative Words Count: {negative_count}")
                print(f"Positive Words Count: {positive_count}")
                print()
                # # ลบไฟล์ชั่วคราวหลังการประมวลผล
                # os.remove(audio_file_path)

    total_transcripts_combined = ' '.join(total_transcripts)
    negative_percentage_total_words = (total_negative_count / total_words_count) * 100 if total_words_count > 0 else 0
    negative_percentage_positive_words = (total_negative_count / total_positive_count) * 100 if total_positive_count > 0 else 0

    is_negative_overall = (
        negative_percentage_total_words >= 30 or
        negative_percentage_positive_words >= 10
    )

    print("\n--- Summary ---")
    print(f"Total Negative Words Count: {total_negative_count}")
    print(f"Total Positive Words Count: {total_positive_count}")
    print(f"Total Negative Words Compared To Positive Words: {negative_percentage_positive_words:.2f}%")
    print(f"Total Negative Words Compared To Total Words: {negative_percentage_total_words:.2f}%")
    print(f"Is the overall sentiment negative? {'Yes' if is_negative_overall else 'No'}")
    print(f"Total Words Count: {total_words_count}")
    print(f"Total Transcript: {total_transcript}")
    print()

    # ลบไฟล์เสียงทั้งหมดในไดเรกทอรีหลังจากการประมวลผลทั้งหมดเสร็จสิ้น
    delete_all_audio_files(directory)


In [42]:
if __name__ == "__main__":
    # ตัวอย่างการใช้งาน
    file_path = r"../you_path"  # เปลี่ยนเป็นพาธของไฟล์ที่ต้องการ
    process_single_file(file_path)

    # ตัวอย่างการใช้งาน
    directory = '../you_directory'
    process_all_wav_files(directory)

    # ล้างหน่วยความจำเพื่อป้องกันปัญหาหน่วยความจำเกิน
    gc.collect()

Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_01.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_02.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_03.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_04.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_05.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_06.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_07.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทสอีกรอบ\อภิสิทธิ์ชน - COCKTAIL(edited)_segment_08.wav
Exported D:\Project\DataSet\Data_แค่ที่จำเป็น\ทดสอบ\For_Test\เทส