## 内存字符串

In [2]:
from io import StringIO
f = StringIO()
f.write('hello')
f.write(' ')
f.write('world!')
print(f.getvalue())
f = StringIO('Hello!\nHi!\nGoodbye!')
while True:
    
    s = f.readline()
    if s == '':
        break
    print(s.strip())

hello world!
Hello!
Hi!
Goodbye!


## 内存字节流

In [3]:
from io import BytesIO
f = BytesIO()
f.write('中文'.encode('utf-8'))
print(f.getvalue())
type(f)
print(f.read()) # 由于此时游标是从f 的 最后的位置开始 read，那么后面的内容肯定是空 
f.tell() # 说明游标是在f最后的位置 
f.seek(0)  
f.read(2)

b'\xe4\xb8\xad\xe6\x96\x87'
b''


b'\xe4\xb8'

## 以流的形式读入转化成 ndarray 对象

### 流转数组

In [14]:
import numpy as np
b = b'abcdef' # bytes
image_array = np.frombuffer(b, dtype=np.uint8) # numpy array
print(image_array)

[ 97  98  99 100 101 102]


### 流转图片

In [21]:
import numpy as np
import requests
import cv2
import io
url = 'http://www.pyimagesearch.com/wp-content/uploads/2015/01/google_logo.png'
resp = response = requests.get(url=url)
byte_stream = io.BytesIO(response.content) #请求数据转化字节流
roiImg = Image.open(byte_stream) #Image打开二进制流Byte字节流数据
imgByteArr = io.BytesIO() #创建一个空的Bytes对象
roiImg.save(imgByteArr, format='PNG')
imgByteArr = imgByteArr.getvalue() #保存的二进制流
with open("./abc.png", "wb") as f:
    f.write(imgByteArr)

In [16]:
from PIL import Image
im = Image.fromarray(image_array)
im.show()

## zip 压缩解压文件

In [4]:
import os
import shutil
import zipfile

def load_json(json_path):
    with open(json_path, 'r', encoding='utf-8') as f:
        dic = json.load(f)
    return dic

def load_task(fp):
    task_df = pd.read_csv(fp, encoding='utf-8', dtype={'code':int})
    task_df['data'] = task_df['data'].apply(literal_eval)
    task_df['msg'] = task_df['msg'].apply(literal_eval)
    return task_df 


class ZipHanding:
    """Decompress the ZIP file or compress the file into a ZIP file"""
    def unzip(self, zip_path, dp=None):  
        """
        Unzip zip file to a folder
        
        Parameters
        ---------
        zip_path: Zip file path 
        dp: Decompression path

        Returns
        ---------
        dir_path: The path of the extracted folder

        Raises
        ---------
        """
        if dp == None:
            ## By default, it is decompressed to the current directory
            dp = zip_path.rsplit('.', 1)[0]
        if os.path.exists(dp):
            shutil.rmtree(dp)        
        with zipfile.ZipFile(zip_path, 'r') as f:
            for fn in f.namelist():
                f.extract(fn, dp)
        return dp

    def __rm_file_folder(self, fp):
        '''
        remove file or folder
        '''
        if os.path.exists(fp) and os.path.isfile(fp):
            os.remove(fp)
        elif os.path.exists(fp) and os.path.isdir(fp):
            shutil.rmtree(fp)
        else:
            raise FileNotFoundError('File not found')

    def __rename_correctly(self, string):
        '''
        Rename the garbled characters extracted from the window package
        '''
        try:
            new_string = string.encode('cp437').decode('utf-8')
        except:
            try:
                new_string = string.encode('cp437').decode('gbk')
            except:
                new_string = string.encode('utf-8').decode('utf-8')
        return  new_string
        
    def __rm_special_name_folder(self, dir_path, folder_name='__MACOSX'): 
        '''
        Delete the folder named 'xxxx' if it exists.
        The default folder name is __MACOSX.
        '''
        for root, dirs, files in os.walk(dir_path, topdown=False):
            for name in dirs:
                if name == folder_name :
                    shutil.rmtree(os.path.join(root, name))  

    def __standard_zip_dir(self, dir_path):
        for root, dirs, filenames in os.walk(dir_path):
            for fn in filenames:
                fp = os.path.join(root, fn)
                new_fp = os.path.join(dir_path, self.__rename_correctly(fn))
                os.rename(fp, new_fp)
                
        for root, dirs, filenames in os.walk(dir_path):
            for folder in dirs:
                dp = os.path.join(root, folder)
                self.__rm_file_folder(dp)
                self.__rm_special_name_folder(dir_path)   
                 
    @classmethod
    def decompression(cls, zip_path):
        dir_path = cls().unzip(zip_path)
        cls().__standard_zip_dir(dir_path)      
        return dir_path

    @classmethod
    def zip_file(cls, dfp, out_path=None):
        """
        Compresses the specified folder

        Parameters
        ---------
        dfp: Destination folder or file path.
        out_path: Save path of the compressed file +xxxx.zip.

        Returns
        ---------
        """

        if os.path.isdir(dfp):   
            if out_path == None:
                out_path = dfp + '.zip'
            with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) as f:
                    for root, _, filenames in os.walk(dfp):
                        for fn in filenames:
                            f.write(filename=os.path.join(root, fn), arcname=fn)
        else:
            if out_path == None:
                out_path = dfp.rsplit('.', 1)[0] + '.zip'
            with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) as f:
                f.write(dfp, dfp.rsplit('/', 1)[1])
        return out_path

zip_path = './data/zip_test/三元组数据集_云测早期标注.zip'
ZipHanding.decompression(zip_path)
zip_path2 = './data/zip_test/三元组数据集_内部标注.zip'
dir_path = ZipHanding.decompression(zip_ath2)
print(f'decompression {dir_path}')

dfp = './data/zip_test/三元组数据集_云测早期标注'
ZipHanding.zip_file(dpf)
dfp2 = 'data/zip_test/ABC.json'
res = ZipHanding.zip_file(dfp2)
print(f'zip_file path is {res}')

decompression ./data/zip_test/三元组数据集_内部标注
zip_file path is data/zip_test/ABC.zip


## 文件检查

In [6]:
class FileCheck:
    """
    Document Type Checking

    Args:
    fileType: A string or list of strings with a file type suffix.

    """    
    def __init__(self, fileType):
        self.ALLOWED_EXTENSIONS = fileType
    def allowed_file(self, fn):
        """
        Args:
            fileType(:obj:'list, str'):
                A string or list of strings with a file type suffix.
        Return:
            :obj:'bool, str'
            file type or False

        Examples
        ----------
        Check if the file is DOCX
        >>>docxChick = FileCheck(['docx'])
        >>>docxChick.allowed_file('abcd.docx')
        >>>'docx'
        """
        if '.' in fn and fn.rsplit('.', 1)[1] in self.ALLOWED_EXTENSIONS:
            return fn.rsplit('.', 1)[1]
        else:
            return False

docxChick = FileCheck(['docx'])
docxChick.allowed_file('abcd.docx')

[1;31mSignature:[0m [0mdocxChick[0m[1;33m.[0m[0mallowed_file[0m[1;33m([0m[0mfn[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m
Args:
    fileType(:obj:'list, str'):
        A string or list of strings with a file type suffix.
Return:
    :obj:'bool, str'
    file type or False

Examples
----------
Check if the file is DOCX
>>>docxChick = FileCheck(['docx'])
>>>docxChick.allowed_file('abcd.docx')
>>>'docx'
[1;31mFile:[0m      e:\project\demo\utils\<ipython-input-6-5fe58d42c76a>
[1;31mType:[0m      method
