In [1]:
import os
import re
import json
import nltk
import numpy as np
import tensorflow as tf
import speech_recognition as sr

from tqdm import tqdm
from langdetect import detect
from nltk import FreqDist
from nltk.corpus import stopwords
from nltk.tokenize import TreebankWordTokenizer
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras import Model, Input
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import tokenizer_from_json, Tokenizer
from tensorflow.python.keras.layers import Dense, RNN, Embedding, GlobalAveragePooling1D
# from tensorflow.python.keras.preprocessing.sequence import pad_sequences  - from tf version 2.6.0
# from tensorflow.python.keras.preprocessing.text import tokenizer_from_json, Tokenizer

In [2]:
# nltk.download('all')

In [3]:
class dataset:
    def __init__(self) -> None:
        pass

    # 데이터셋 정보 json 탐색 함수
    def search_json(self, dir):
        paths = []
        count = 0
        for data in os.listdir(dir):
            file_path = os.path.join(dir, data)
            if os.path.isdir(file_path):
                count += self.search_json(file_path)
            elif data.endswith('.json'):
                count += 1
                paths.append(file_path)
        print(paths)
        return count
    
    # 데이터셋 발화자 id 확인 함수 - Use Text
    def id_checker(self, json_path):
        id_sector = []
        with open(json_path, encoding='utf-8-sig') as file:
            json_data = json.load(file) # load json
            for i in range(0, 2):   # id search loop
                id = json_data['dataSet']['dialogs'][i]['speaker']  # id 0,1 indexing
                category = json_data['dataSet']['typeInfo']['category'] # category indexing
                if id and category is not None:
                    id_sector.append(id)
            v1, v2 = id_sector[0], id_sector[1]
        print(f"Server : {v1}\nClient : {v2}")
        print(f"Category : {category}")
        return v1, v2, category

    # audio to text - stt module
    def stt_from_array(audio_data):
        recognizer = sr.Recognizer()
        with sr.AudioFile(audio_data) as source:
            audio = recognizer.record(source)
        try:
            text = recognizer.recognize_google(audio)
            print(f"stt : {text}")
        except sr.UnknownValueError:
            print("Exception : UnknownValueError")
        except sr.RequestError as e:
            print("Exception : STT engine RequestError")\
            
    
    
    # def integer_indexer():///
    
    # def data_generator():

    # def data_loader():


In [61]:
class TokenProcessor:
    def __init__(self, path):
        self.path = path
        self.word_set = []

    def tokenize_index(self, input_texts):
        tokenizer = TreebankWordTokenizer()
        tokenized_texts = tokenizer.tokenize(' '.join(input_texts))

        word_list = [word.lower() for word in tokenized_texts if word.isalpha()]
        word_freq = FreqDist(word_list)

        sorted_words = sorted(word_freq, key=word_freq.get, reverse=True)
        word_to_index = {word: index + 1 for index, word in enumerate(sorted_words)}
        indexed_texts = [word_to_index[word.lower()] for word in tokenized_texts if word.isalpha()]

        print(f"Detect Language: {detect(' '.join(input_texts))}")
        print(f"Input Texts: {input_texts}")
        print(f"Tokenized Sequences: {tokenized_texts}")
        print(f"Indexed Sequences: {indexed_texts}")
        print(f"Word to Index : {word_to_index}")

        return indexed_texts
    
    def stopwords_slicer(self, input_texts):
        pattern = r'\s*n/|\(.*\)'
        sliced_text = re.sub(pattern, ' ', input_texts)
        return sliced_text
    
    def read_talk_data(self):   # 데이터셋 text 파일 (read, return)
        contents = []
        for filename in os.listdir(self.path):
            if filename.endswith('.txt'):
                filePath = os.path.join(self.path, filename)
                with open(filePath, 'r') as text:
                    file_contents = text.read()
                    cleaned = self.stopwords_slicer(file_contents)
                    contents.append(cleaned)
        return contents
    
    def pad_seqs(self, sequences): 
        padded_seq = pad_sequences(sequences)
        return padded_seq
            
    def run(self):  # main worker
        contents = self.read_talk_data()
        print(contents)
        # indexed_texts = self.tokenize_index(contents)
        # return indexed_texts
        # padded_seqs = self.pad_seqs([indexed_texts])
        # print(f"Padded_seqs shape : {padded_seqs.shape}")
        # return padded_seqs

In [62]:
path = './local/sample/label/S00007727/'

In [63]:
tokenizer = TokenProcessor(path)
indexed_texts = tokenizer.run()
# padded_seqs = tokenizer.run()
# print(padded_seqs[1])

['  안녕하십니까, 고객님.', '  네, 우리 아이 학습지 좀 새로 등록해볼까  .', '  네, 아이 나이가 어떻게 되는지 여쭤봐도 될까요?', '   .', '  네, 미취학 아동을 대상으로 한 프로그램이 준비되어있습니다.', '  그런데 우리 아이가 말을 늦게 배운 편이라서, 단계가 여러 개면 좀 낮은 걸로 먼저 들어+ 들어야 될 것 같은데요. 홈페이지 보니까 프로그램이 여러 개고 그 안에서 단계도 여러 개 나누어져 있던데 미취학 아동 프로그램은 단계가 총 몇 개로 나누어져 있죠?', '  저희가 신규 등록 시에 무료로 레벨 테스트를 하고 있습니다.', '  아/ 그래요? 레벨 테스트를 어떻게 하는데요? 레벨 테스트 후에 단계를 정해주시는  ?', '  방문 교사가 직접 방문드린 뒤에 저희 쪽 교재를 가지고 아이의 발달 사항을 체크한 후에 단계를 정해드립니다.', '  애가 낯을 많이 가려서 그런데 제가 옆에서 같이 있어도 되나요? 애가 낯을 가리긴 하는데 좀 산만한 편이에요.', '  네, 가능하십니다.', '  그러면 레벨 테스트를 우선 받아볼게요. 레벨 테스트는 무료인거면 레벨테스트만 받고 등록 안 해도 무료인 거죠?', '  네, 그렇습니다.', '  네, 그럼 일단 등록은 안 하고 레벨테스트만 들어볼게요. 홈페이지에 프로그램 내용은 정확하게 안 나와있던데 미취학 아동 프로그램 안에서 또 프로그램이 나눠져 있는 거 있는 거죠? 아니면 미취학 아동 프로그램은 딱  ?', '  저희 프로그램이 크게 한글 프로그램이랑 영어 프로그램으로 나누어져 있는데 어떤 프로그램을 찾으세요?', '  음/ 원래는 한글을 가르치려고 했는데 영어도 같이 하면 좀 저렴한가요? 저렴하면 영어랑   받을 수 있어요?', '  프로그램  까지는 따로 할인되는 게 없으세요.', '    할인된다고 적혀있던데. 이건 그럼 뭔데요?', '  카드사 할인이십니다. 고객님.', '  그럼 카드사 할인이라고 미리 적어놨어야죠. 홈페이지만 보면 카드사 할인인지 그냥 할인인지 모르잖아요.