# 字符编码转utf-8

In [None]:
%pip install django
%pip install chardet

In [None]:
########################################################################################################
# 语料TXT文本编码转换为UTF8  - 作者：i126@126.com 
# https://github.com/adetion/txtfilemerge
# modify by sayo with chatgpt
########################################################################################################
import chardet
import codecs
import os
from django.utils.encoding import force_str
from django.utils.functional import Promise
from pathlib import Path
import shutil

def allpath_txt_encoding_to_utf8(input_path, output_path, file_ext=('.txt', '.csv')):
    """
    某目录下(含子目录)所有文本文件编码格式全部转为UTF-8
    传入参数：输入路径和输出路径。也可为绝对路径（windows可能会有BUG）
    """
    for dirpath, _, filenames in os.walk(input_path):
        for filename in filenames:
            try:
                if os.path.splitext(filename)[1].lower() in file_ext:
                    full_input_path = os.path.join(dirpath, filename)
                    
                    relative_path = os.path.relpath(full_input_path, input_path)
                    
                    full_output_path = os.path.join(output_path, relative_path)
                    
                    file_txt_encoding_to_utf8(full_input_path, full_output_path)
            except Exception as ERR:
                print('Error:', ERR)

def path_txt_encoding_to_utf8(input_path, output_path, file_ext=('.txt', '.csv')):
    """
    某目录下（不含子目录）所有文本文件编码格式全部转为UTF-8
    传入参数：输入路径和输出路径。也可为绝对路径（windows可能会有BUG）
    """
    dis = os.listdir(input_path)
    for filename in dis:
        try:
            if os.path.splitext(filename)[1].lower() in file_ext:
                full_input_path = os.path.join(input_path, filename)
                full_output_path = os.path.join(output_path, filename)
                
                file_txt_encoding_to_utf8(full_input_path, full_output_path)
        except Exception as ERR:
            print('Error:', ERR)

def file_txt_encoding_to_utf8(input_file, output_file, file_ext=('.txt', '.csv')):
    if os.path.splitext(input_file)[1].lower() in file_ext:
        f_type = check_file_charset(input_file)
        print(input_file, "字符集为：", f_type['encoding'])
        output_dir = os.path.dirname(output_file)
        Path(output_dir).mkdir(parents=True, exist_ok=True)

        try:
            if f_type and 'encoding' in f_type.keys() and f_type['encoding'] != 'utf-8':
                with codecs.open(input_file, 'rb', f_type['encoding'], errors='ignore') as f:
                    content = smart_str(f.read())
                
                with codecs.open(output_file, 'w', 'utf-8') as f:
                    f.write(content)
                print("字符集转换成功：自动")
            else:
                shutil.copy2(input_file, output_file)
                print("字符集为 utf-8，不需要进行转换")
        except Exception as ERR:
            try:
                content = codecs.open(input_file, 'rb', encoding='gbk').read()
                
                with codecs.open(output_file, 'w', 'utf-8') as f:
                    f.write(content)
                print("字符集转换成功：GBK --> UTF-8")
            except Exception as ERR1:
                try:
                    content = codecs.open(input_file, 'rb', encoding='gb18030', errors='ignore').read()
                    
                    with codecs.open(output_file, 'w', 'utf-8') as f:
                        f.write(content)
                    print("字符集转换成功：gb18030 --> UTF-8")
                except Exception as ERR2:
                        try:
                            content = codecs.open(input_file, 'rb', encoding='big5').read()

                            with codecs.open(output_file, 'w', 'utf-8') as f:
                                f.write(content)
                            print("字符集转换成功：big5 --> UTF-8")
                        except Exception as ERR3:
                            print('error ERR3')
                            pass
    else:
        print(input_file, '文件(扩展名)不在允许转换范围内...')
        pass


def check_file_charset(file):
    with open(file, 'rb') as f:
        return chardet.detect(f.read()[0:1024])

def smart_str(s, encoding="utf-8", strings_only=False, errors="strict"):
    """
    返回表示“s”的字符串。使用“encoding”处理字节字符串编解码器。
    如果strings_only为True，则不转换（某些）非字符串类对象。
    """
    if isinstance(s, Promise):
        return s
    return force_str(s, encoding, strings_only, errors)

In [None]:
# 建议使用的多线程/多进程版本
input_path = "origin"
output_path = "utf-8"
allpath_txt_encoding_to_utf8(input_path, output_path)

# 多线程

In [None]:
import threading
import queue
import os

class FileProcessor:
    
    def __init__(self, input_path, output_path):
        self.input_path = input_path
        self.output_path = output_path
        self.file_queue = queue.Queue()
    
    def get_all_files(self, new_ext = None):
        for dirpath, _, filenames in os.walk(self.input_path):
            for filename in filenames:
                full_input_path = os.path.join(dirpath, filename)
                relative_path = os.path.relpath(full_input_path, self.input_path)
                if new_ext is not None and new_ext != "":
                    root, _ = os.path.splitext(relative_path)
                    full_output_path = os.path.join(self.output_path, root + new_ext)
                else:
                    full_output_path = os.path.join(self.output_path, relative_path)
                self.file_queue.put((full_input_path, full_output_path))
    
    def process_file(self, input_path, output_path, file_ext='.txt|.csv'):
        pass
    
    def worker(self):
        while not self.file_queue.empty():
            input_file_path, output_file_path = self.file_queue.get()
            try:
                self.process_file(input_file_path, output_file_path)
            except Exception as e:
                print(f"Error processing file {input_file_path}: {str(e)}")
            finally:
                self.file_queue.task_done()
    
    def process_files_multithreaded(self, num_threads, new_ext=None):
        self.get_all_files(new_ext)
        
        threads = []
        for _ in range(num_threads):
            t = threading.Thread(target=self.worker)
            t.start()
            threads.append(t)
        
        self.file_queue.join()
        
        for t in threads:
            t.join()

In [None]:
# 线程数根据cpu线程数定，使用多进程版本更快
input_path = "origin"
output_path = "utf-8"
processor = FileProcessor(input_path, output_path)

processor.process_file = file_txt_encoding_to_utf8

processor.process_files_multithreaded(128)

# 多进程

In [None]:
import os
from multiprocessing import Pool

class MulitProcessingFileProcessor:
    
    def __init__(self, input_path, output_path):
        self.input_path = input_path
        self.output_path = output_path
    
    def get_all_files(self, new_ext = None):
        files = []
        for dirpath, _, filenames in os.walk(self.input_path):
            for filename in filenames:
                input_file_path = os.path.join(dirpath, filename)
                relative_path = os.path.relpath(input_file_path, self.input_path)
                if new_ext is not None and new_ext != "":
                    root, _ = os.path.splitext(relative_path)
                    output_file_path = os.path.join(self.output_path, root + new_ext)
                else:
                    output_file_path = os.path.join(self.output_path, relative_path)

                files.append((input_file_path, output_file_path))
        return files
    
    def process_file(self, paths):
        pass

    def process_files_multiprocessing(self, num_processes, new_ext = None):
        files_to_process = self.get_all_files(new_ext)
        
        with Pool(processes=num_processes) as pool:
            pool.map(self.process_file, files_to_process)


In [None]:
# 限Linux，进程数根据cpu物理核数来定
input_path = "origin"
output_path = "utf-8"
processor = MulitProcessingFileProcessor(input_path, output_path)

def pre_process_file(paths):
    input_file_path, output_file_path = paths
    file_txt_encoding_to_utf8(input_file_path, output_file_path)

processor.process_file = pre_process_file

processor.process_files_multiprocessing(128)

# 检查字符编码

In [None]:
def check_utf8_encoding(input_path, file_ext=('.txt', '.csv')):
    """
    遍历input_path下的所有文件，并检查它们是否是UTF-8编码。
    返回一个包含所有非UTF-8编码文件名的列表。
    """
    non_utf8_files = []
    for dirpath, _, filenames in os.walk(input_path):
        for filename in filenames:
            if os.path.splitext(filename)[1].lower() in file_ext:
                full_input_path = os.path.join(dirpath, filename)
                
                f_type = check_file_charset(full_input_path)
                if f_type and 'encoding' in f_type.keys():
                    if f_type['encoding']:
                        if f_type['encoding'].lower() != 'utf-8':
                            non_utf8_files.append(full_input_path)
                            print(f"Non-UTF8 file: {full_input_path}, Encoding: {f_type['encoding']}")
                    else:
                        print(f"Non-Encoding file: {full_input_path}")
                
    
    return non_utf8_files

def check_file_charset(file):
    with open(file, 'rb') as f:
        return chardet.detect(f.read(1024))

In [None]:
input_path = ""
check_utf8_encoding(input_path)