codecs 模块提供了流和文件接口用于转换数据。它通常用于处理 Unicode 文本。

In [1]:
import binascii

def to_hex(t, nbytes):
    """Format text t as a sequence of nbyte long values separated by spaces"""
    chars_per_item = nbytes*2
    hex_version = binascii.hexlify(t)
    return b' '.join(hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item))

In [3]:
print(to_hex(b'abcdef', 1))
print(to_hex(b'abcdef', 2))

b'61 62 63 64 65 66'
b'6162 6364 6566'


In [8]:
import unicodedata

text = 'françaiS'
print(f'Raw: {text!r}')
for c in text:
    print(f"  {c!r}:{unicodedata.name(c,c)}")
    
print(f"UTF-8 : {to_hex(text.encode('utf-8'),1)}")
print(f"UTF-16 : {to_hex(text.encode('utf-16'),1)}")
print(f"UTF-16 : {to_hex(text.encode('utf-16'),2)}")

Raw: 'françaiS'
  'f':LATIN SMALL LETTER F
  'r':LATIN SMALL LETTER R
  'a':LATIN SMALL LETTER A
  'n':LATIN SMALL LETTER N
  'ç':LATIN SMALL LETTER C WITH CEDILLA
  'a':LATIN SMALL LETTER A
  'i':LATIN SMALL LETTER I
  'S':LATIN CAPITAL LETTER S
UTF-8 : b'66 72 61 6e c3 a7 61 69 53'
UTF-16 : b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00 53 00'
UTF-16 : b'fffe 6600 7200 6100 6e00 e700 6100 6900 5300'


In [3]:
import codecs

def main(encoding):
    filename = encoding + '.txt'
    print('Writing to ', filename)
    with codecs.open(filename, mode='w', encoding=encoding) as f:
        f.write('françaiS')
        
    nbytes = {'utf-8': 1, 'utf-16':2, 'utf-32':3}.get(encoding, 1)
    print('File Contents:')
    with open(filename, mode='rb') as f:
        print(to_hex(f.read(), nbytes))

In [4]:
main('utf-8')

Writing to  utf-8.txt
File Contents:
b'66 72 61 6e c3 a7 61 69 53'


In [5]:
main('utf-16')

Writing to  utf-16.txt
File Contents:
b'fffe 6600 7200 6100 6e00 e700 6100 6900 5300'


In [6]:
main('utf-32')

Writing to  utf-32.txt
File Contents:
b'fffe00 006600 000072 000000 610000 006e00 0000e7 000000 610000 006900 000053 000000'


In [8]:
with codecs.open('utf-32.txt', mode='r', encoding='utf-32') as f:
    print(f.read())

françaiS


## 字节顺序

In [9]:
BOM_TYPES = [
    'BOM', 'BOM_BE', 'BOM_LE',
    'BOM_UTF8',
    'BOM_UTF16', 'BOM_UTF16_BE', 'BOM_UTF16_LE',
    'BOM_UTF32', 'BOM_UTF32_BE', 'BOM_UTF32_LE',
]

for name in BOM_TYPES:
    print(f"{name:12} : {to_hex(getattr(codecs, name), 2)}")

BOM          : b'fffe'
BOM_BE       : b'feff'
BOM_LE       : b'fffe'
BOM_UTF8     : b'efbb bf'
BOM_UTF16    : b'fffe'
BOM_UTF16_BE : b'feff'
BOM_UTF16_LE : b'fffe'
BOM_UTF32    : b'fffe 0000'
BOM_UTF32_BE : b'0000 feff'
BOM_UTF32_LE : b'fffe 0000'


In [10]:
# 选择 UTF-16 编码的非本地编码
if codecs.BOM_UTF16 == codecs.BOM_UTF16_BE:
    bom = codecs.BOM_UTF16_LE
    encoding = 'utf_16_le'
else:
    bom = codecs.BOM_UTF16_BE
    encoding = 'utf_16_be'

print('Native order  :', to_hex(codecs.BOM_UTF16, 2))
print('Selected order:', to_hex(bom, 2))

# 编码数据
encoded_text = 'français'.encode(encoding)
print('{:14}: {}'.format(encoding, to_hex(encoded_text, 2)))

with open('nonnative-encoded.txt', mode='wb') as f:
    # 写入字节顺序标记，它没有包含在编码文本中，因为选择编码的时候字节顺序被给定了。
    f.write(bom)
    # 写入编码文本的字节字符串
    f.write(encoded_text)

Native order  : b'fffe'
Selected order: b'feff'
utf_16_be     : b'0066 0072 0061 006e 00e7 0061 0069 0073'


In [11]:
# 查看原生数据
with open('nonnative-encoded.txt', mode='rb') as f:
    raw_bytes = f.read()

print('Raw    :', to_hex(raw_bytes, 2))

# 重新打开文件，并且让 codecs 检测 BOM
with codecs.open('nonnative-encoded.txt',
                 mode='r',
                 encoding='utf-16',
                 ) as f:
    decoded_text = f.read()

print('Decoded:', repr(decoded_text))

Raw    : b'feff 0066 0072 0061 006e 00e7 0061 0069 0073'
Decoded: 'français'


## 错误处理
|错误模式	|描述|
| ------ | ------ |
|strict	|数据没有被正确转换将会引发错误。|
|replace	|对于不能编码的数据替换一个特殊的标记字符。|
|ignore	|跳过数据。|
|xmlcharrefreplace	|XML 字符 (仅用于编码)|
|backslashreplace	|转移序列 (仅用于编码)|

### 编码错误

In [12]:
def main(error_handling):
    text = 'français'
    try:
        with codecs.open('encode_error.txt', 'w', encoding='ascii', errors=error_handling) as f:
            f.write(text)
    except UnicodeEncodeError as err:
        print('ERROR:', err)
    else:
        with open('encode_error.txt', 'rb') as f:
            print(f'file contents: {f.read()!r}')

In [13]:
main('strict')  # default value

ERROR: 'ascii' codec can't encode character '\xe7' in position 4: ordinal not in range(128)


In [14]:
main('replace')

file contents: b'fran?ais'


In [15]:
main('ignore')

file contents: b'franais'


In [16]:
main('xmlcharrefreplace')

file contents: b'fran&#231;ais'


In [17]:
main('backslashreplace')

file contents: b'fran\\xe7ais'


### 解码错误

In [18]:
def main(error_handling):
    text = 'français'
    with codecs.open('decode_error.txt', 'w', encoding='utf-16') as f:
        f.write(text)
    
    # 转化文件中的字节
    with open('decode_error.txt', 'rb') as f:
        print('File contents:', to_hex(f.read(),1))
        
    with codecs.open('decode_error.txt', 'r', encoding='utf-8', errors=error_handling) as f:
        try:
            data = f.read()
        except UnicodeEncodeError as err:
            print('ERROR:', err)
        else:
            print('Read   :', repr(data))

In [19]:
main('strict')

File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00 73 00'


UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte

In [20]:
main('ignore')

File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00 73 00'
Read   : 'f\x00r\x00a\x00n\x00\x00a\x00i\x00s\x00'


In [21]:
main('replace')

File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00 73 00'
Read   : '��f\x00r\x00a\x00n\x00�\x00a\x00i\x00s\x00'


## 编码转换

In [22]:
import io

data = 'français'
utf8 = data.encode('utf-8')
print('starts as utf8:', to_hex(utf8, 1))

# 设置输出缓冲池，将它包装为EncodedFile
output = io.BytesIO()
encoded_file = codecs.EncodedFile(output, data_encoding='utf-8', file_encoding='utf-16')
encoded_file.write(utf8)

# 获取缓冲内容，编码为UTF-16
utf16 = output.getvalue()
print('Encoded to UTF-16:', to_hex(utf16,2))

# 使用 UTF-16 数据设置另一个缓冲池，并且包装为另一个 EncodedFile
buffer = io.BytesIO(utf16)
encoded_file = codecs.EncodedFile(buffer, data_encoding='utf-8', file_encoding='utf-16')

# 读取数据的 UTF-8 版本
recoded = encoded_file.read()
print('Back to UTF-8    :', to_hex(recoded, 1))

starts as utf8: b'66 72 61 6e c3 a7 61 69 73'
Encoded to UTF-16: b'fffe 6600 7200 6100 6e00 e700 6100 6900 7300'
Back to UTF-8    : b'66 72 61 6e c3 a7 61 69 73'


In [23]:
buffer = io.StringIO()
stream = codecs.getwriter('rot_13')(buffer)  # rot_13解码器，旋转13个字符

text = 'abcdefghijklmnopqrstuvwxyz'
stream.write(text)
stream.flush()

print('ROT_13:', buffer.getvalue())

ROT_13: nopqrstuvwxyzabcdefghijklm


In [25]:
buffer = io.BytesIO()
stream = codecs.getwriter('zlib')(buffer)
text = b'abcdefghijklmnopqrstuvwxyz\n' * 50

stream.write(text)
stream.flush()

print('Original len:', len(text))
compress_data = buffer.getvalue()
print('Compress len:', len(compress_data))

buffer = io.BytesIO(compress_data)
stream = codecs.getreader('zlib')(buffer)

first_line = stream.readline()
print('First line:', first_line)
uncompress_data = first_line + stream.read()
print('Uncompress len:', len(uncompress_data))
print('Same ? :', uncompress_data == text)

Original len: 1350
Compress len: 48
First line: b'abcdefghijklmnopqrstuvwxyz\n'
Uncompress len: 1350
Same ? : True


In [30]:
import sys

text = b'abcdefghijklmnopqrstuvwxyz\n' 
repeat = 50

print('Text len:', len(text))
print('Repeat:', repeat)
print('Expect len:', len(text) * repeat)

encoder = codecs.getincrementalencoder('bz2')()
encoded = []
print()
print('Encoding:')
last = repeat -1
for i in range(repeat):
    en_c = encoder.encode(text, final=(i==last))
    if en_c:
        print('\nEncoded: {} bytes'.format(len(en_c)))
        encoded.append(en_c)
    else:
        sys.stdout.write('.')
        
allencoded = b''.join(encoded)
print('Total encoded len:', len(allencoded))

decoder = codecs.getincrementaldecoder('bz2')()
decoded = []
print()
print('Decoding:')
for i,b in enumerate(allencoded):
    final = (i+1) == len(text)
    c = decoder.decode(bytes([b]), final)
    if c:
        print('\nDecoded: {} bytes'.format(len(c)))
        print('Decoding:')
        decoded.append(c)
    else:
        sys.stdout.write('.')
        
restored = b''.join(decoded)
print()
print('Total decoded len:', len(restored))

Text len: 27
Repeat: 50
Expect len: 1350

Encoding:
.................................................
Encoded: 99 bytes
Total encoded len: 99

Decoding:
........................................................................................
Decoded: 1350 bytes
Decoding:
..........
Total decoded len: 1350


可能会在数据流处理时大幅改变数据流的长度。对于大数据集，这些操作最好是渐进式的，一次只处理小量数据块，IncrementalEncoder 和 IncrementalDecoder则设计用于这个目的。每次传递给编码器或者解码器的时候，其内部状态会更新。当状态一致的时候（由编解码器定义），数据返回并且状态重置。在那之前，调用 encode() 或者 decode() 将不会返回任何数据。当传入最后一批数据时，参数 final 应该设置为 True，因此编解码器直到清除所有剩余的缓冲数据。

## 网络通信

In [31]:
import socketserver
import socket
import threading

class Echo(socketserver.BaseRequestHandler):
    def handle(self):
        data = self.request.recv(1024)
        self.request.send(data)
        return

address = ('localhost', 0)
server = socketserver.TCPServer(address, Echo)
ip, port = server.server_address
t = threading.Thread(target=server.serve_forever)
t.setDaemon(True)
t.start()

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, port))

# 发送数据，未进行编码，会报错
text = 'français'
len_sent = s.send(text)

response = s.recv(len_sent)
print(repr(response))
s.close()
server.socket.close()

TypeError: a bytes-like object is required, not 'str'

In [34]:
class PassThrough:
    def __init__(self, other):
        self.other = other
        
    def write(self, data):
        print('Writing:', repr(data))
        return self.other.write(data)
    
    def read(self, size=-1):
        print('Reading:')
        data = self.other.read(size)
        print(repr(data))
        return data
    
    def flush(self):
        return self.other.flush()
    
    def close(self):
        return self.other.close()
    
address = ('localhost', 0)
server = socketserver.TCPServer(address, Echo)
ip, port = server.server_address
t = threading.Thread(target=server.serve_forever)
t.setDaemon(True)
t.start()

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, port))

# 使用读取器和写入器包装套接字
read_file = s.makefile('rb')
incoming = codecs.getreader('utf-8')(PassThrough(read_file))
write_file = s.makefile('wb')
outgoing = codecs.getwriter('utf-8')(PassThrough(write_file))

text = 'français'
print('Sending:', repr(text))
outgoing.write(text)
outgoing.flush()

response = incoming.read()
print('Recived:',repr(response))
s.close()
server.socket.close()
    

Sending: 'français'
Writing: b'fran\xc3\xa7ais'
Reading:
b'fran\xc3\xa7ais'
Reading:
b''
Recived: 'français'


## 自定义编码

In [1]:
import string

def invertcaps(text):
    """Return new string with the case of all letters switched.
    """
    return ''.join(
        c.upper() if c in string.ascii_lowercase
        else c.lower() if c in string.ascii_uppercase
        else c
        for c in text
    )

In [2]:
print(invertcaps('ABCdef'))
print(invertcaps('abcDEF'))

abcDEF
ABCdef


第一步是去了解编码转换的性质。尽管它很容易理解，但是实现并不高效，特别是对于大文本字符串。  
codecs 包含一些用于创建基于字符映射的编解码器的辅助函数。字符映射编码由两个字典组成。编码字典将输入字符串中的字符值转换为输出中的字节值，解码字典则以另一种方式运行。首先创建解码映射，然后使用 make_encoding_map() 将其转换为编码映射。C 函数 charmap_encode() 和 charmap_decode() 使用这种映射关系高效地转换他们的输入数据。

In [4]:
import codecs
import string

decoding_map = codecs.make_identity_dict(range(256))

pairs = list(zip([ord(c) for c in string.ascii_lowercase], [ord(c) for c in string.ascii_uppercase]))
decoding_map.update({upper:lower for (lower, upper) in pairs})
decoding_map.update({lower:upper for (lower, upper) in pairs})
             
encoding_map = codecs.make_encoding_map(decoding_map)

print(codecs.charmap_encode('abcDEF', 'strict', encoding_map))
print(codecs.charmap_decode(b'abcDEF', 'strict', decoding_map))
print(decoding_map == encoding_map)

(b'ABCdef', 6)
('ABCdef', 6)
True


In [8]:
text = 'pi: \u03c0'  # π 的 Unicode 码点没有在这个编码图中

for error in ['ignore', 'replace', 'strict']:
    try:
        encoded = codecs.charmap_encode(text, error, encoding_map)
    except UnicodeEncodeError as err:
        encoded = str(err)
        
    print(f'{error:7}:{encoded}')

ignore :(b'PI: ', 5)
replace:(b'PI: ?', 5)
strict :'charmap' codec can't encode character '\u03c0' in position 4: character maps to <undefined>


In [9]:
codecs.lookup('utf-8')

<codecs.CodecInfo object for encoding utf-8 at 0x1039ab2e8>

In [5]:
import codecs
class InvertCapsCodec(codecs.Codec):
    def encode(self, input, errors='strict'):
        return codecs.charmap_encode(input, errors, encoding_map)
    def decode(self, input, errors='strict'):
        return codecs.charmap_decode(input, errors, decoding_map)
    
class InvertCapsIncrementalEncoder(codecs.IncrementalEncoder):
    def encode(self, input, final=False):
        data, nbytes = codecs.charmap_encode(input, self.errors, encoding_map)
        return data
    
class InvertCapsIncrementalDecoder(codecs.IncrementalDecoder):
    def decode(self, input, final=False):
        data, nbytes = codecs.charmap_decode(input, self.errors, decoding_map)
        return data
    
class InvertCapsStreamReader(InvertCapsCodec, codecs.StreamReader):
    pass
class InvertCapsStreamWriter(InvertCapsCodec, codecs.StreamWriter):
    pass

def find_invertcaps(encoding):
    if encoding == 'invertcaps':
        return codecs.CodecInfo(name='invertcaps', encode=InvertCapsCodec().encode, decode=InvertCapsCodec().decode,
                               incrementalencoder=InvertCapsIncrementalEncoder, incrementaldecoder=InvertCapsIncrementalDecoder,
                                streamreader=InvertCapsStreamReader, streamwriter=InvertCapsStreamWriter,)
    return None


In [6]:
codecs.register(find_invertcaps)

encoder = codecs.getencoder('invertcaps')
text = 'abcDEF'
encoded_text, consumed = encoder(text)
print(f"Encoded {text} to {encoded_text}, consuming {consumed} chars")

import io
buffer = io.BytesIO()
writer = codecs.getwriter('invertcaps')(buffer)
print('StreamWriter for io buffer : writing abcDEF')
writer.write('abcDEF')
print('buffer contents:', buffer.getvalue())

decoder_factory = codecs.getincrementaldecoder('invertcaps')
decoder = decoder_factory()
decoded_text_parts = []
for c in encoded_text:
    decoded_text_parts.append(decoder.decode(bytes([c]), final=False))
decoded_text_parts.append(decoder.decode(b'', final=True))
decoded_text = ''.join(decoded_text_parts)
print(f'IncrementalDecoder converted {encoded_text!r} to {decoded_text!r}')

Encoded abcDEF to b'ABCdef', consuming 6 chars
StreamWriter for io buffer : writing abcDEF
buffer contents: b'ABCdef'
IncrementalDecoder converted b'ABCdef' to 'abcDEF'
