In [192]:
import openpyxl
import pandas as pd
from os import PathLike
from pathlib import Path, PosixPath
import xlrd
import PyPDF2
import zipfile
import xtarfile
import gzip
import rarfile
import py7zr
import bz2

from docx import Document
from PIL import Image
from abc import ABCMeta, abstractmethod
from multiprocessing import Pool


In [250]:
CHECK_FILE_OK = 0
CHECK_FILE_FAILED = -1
CHECK_FILE_UNKNOWN = -2
CHECK_FILE_NOT_FOUND = -3

CHECK_RESULTS = {
    CHECK_FILE_OK: 'CHECK_FILE_OK',
    CHECK_FILE_FAILED: 'CHECK_FILE_FAILED',
    CHECK_FILE_UNKNOWN: 'CHECK_FILE_UNKNOWN',
    CHECK_FILE_NOT_FOUND: 'CHECK_FILE_NOT_FOUND'
}

ARCHIVE_TYPES = ('.zip', '.gzip', '.gz', '.tar', '.tgz', '.rar', '.7z', '.bz2')
IMAGE_TYPES = ('jpg', 'jpeg', 'png', 'gif', 'tif', 'tiff', 'bmp')
WORD_TYPES = ('.docx', '.rtf')
EXCEL_TYPES = ('.xls', '.xlt', '.xlsx', '.xlsm', '.xltx', '.xltm', '.ods', '.odt', '.odf')
PDF_TYPES = ('.pdf',)


In [251]:
class BaseCheckFile(metaclass=ABCMeta):
    def __init__(self, failed_only=True, verbose=False) -> None:
        self.failed_only = failed_only
        self.verbose = verbose
        
    def check(self, files) -> list:
        assert type(files) in [str, PathLike, PosixPath, list], "files argument must be list ou string"
        
        if type(files) == str:
            files = [files]
            
        p = Pool()
        results = []
        for res in p.map(self.check_one, files):
            if res:
                results += [res]
            
        return list(filter(lambda result: result != 0, results)) 

    def check_one(self, filename) -> tuple:
        return None if Path(filename).exists() else (filename, CHECK_RESULTS[CHECK_FILE_NOT_FOUND])

    def __filter_failed__(self, result):
        if not self.failed_only or result[1] != CHECK_RESULTS[CHECK_FILE_OK]:
            return result
            
        return None

    def __print__(self, result):
        if result and self.verbose:
            print(result)

        

In [252]:
class CheckPDF(BaseCheckFile):
    def check_one(self, filename):
        result = super().check_one(filename)
        if not result:
            try:
                with open(filename, 'rb') as file_descriptor:
                    reader = PyPDF2.PdfReader(file_descriptor)
                    if len(reader.pages) > 0:
                        result = (filename, CHECK_RESULTS[CHECK_FILE_OK])
                    else:
                        raise
            except:
                result = (filename, CHECK_RESULTS[CHECK_FILE_FAILED])

        result = self.__filter_failed__(result)
        self.__print__(result)
            
        return result

class CheckExcel(BaseCheckFile):
    def check_one(self, filename):
        result = super().check_one(filename)
        if not result:
            try:
                pd.ExcelFile(filename)
                result = (filename, CHECK_RESULTS[CHECK_FILE_OK])
                
            except:
                result = (filename, CHECK_RESULTS[CHECK_FILE_FAILED])

        result = self.__filter_failed__(result)
        self.__print__(result)

        return result

class CheckImage(BaseCheckFile):
    def check_one(self, filename):
        result = super().check_one(filename)
        if not result:
            try:
                with Image.open(filename) as img:
                    img.verify()
                    result = (filename, CHECK_RESULTS[CHECK_FILE_OK])
            except:
                result = (filename, CHECK_RESULTS[CHECK_FILE_FAILED])

        result = self.__filter_failed__(result)
        self.__print__(result)

        return result

class CheckArquive(BaseCheckFile):
    def check_one(self, filename):
        result = super().check_one(filename)
        if not result:        
            try:
                result = (filename, self.__check__(filename))
            except Exception as e:
                result = (filename, CHECK_RESULTS[CHECK_FILE_FAILED])

        result = self.__filter_failed__(result)
        self.__print__(result)

        return result

    def __check__(self, filename):
        file_type = Path(filename).suffix
        
        if file_type == '.zip':
            with zipfile.ZipFile(filename, 'r') as zip_file:
                return CHECK_RESULTS[CHECK_FILE_OK] if zip_file.testzip() is None else CHECK_RESULTS[CHECK_FILE_FAILED]

        elif file_type in ['.gzip', '.gz', '.tgz']:
            with gzip.open(filename, 'r') as gzip_file:
                gzip_file.read()
                return CHECK_RESULTS[CHECK_FILE_OK]

        elif file_type in ['.tar']:
            with xtarfile.open(filename, 'r') as tar_file:
                return CHECK_RESULTS[CHECK_FILE_OK]

        elif file_type == '.rar':
            with rarfile.RarFile(filename, 'r') as rar_file:
                return CHECK_RESULTS[CHECK_FILE_OK] if len(rar_file.namelist()) > 0 else CHECK_RESULTS[CHECK_FILE_FAILED]

        elif file_type == '.7z':
            with py7zr.SevenZipFile(filename, mode='r') as seven_zip_file:
                seven_zip_file.getnames()
                return CHECK_RESULTS[CHECK_FILE_OK]
                
        elif file_type == '.bz2':
            with bz2.open(filename, 'rb') as bzip2_file:
                _ = bzip2_file.read()
                return CHECK_RESULTS[CHECK_FILE_OK] 
    
        else:
            return CHECK_RESULTS[CHECK_FILE_UNKNOWN]

class CheckWord(BaseCheckFile):
    def check_one(self, filename):
        result = super().check_one(filename)
        if not result:    
            try:
                result = (filename, self.__check__(filename))
            except Exception as e:
                print(str(e))
                result = (filename, CHECK_RESULTS[CHECK_FILE_FAILED])

        result = self.__filter_failed__(result)
        self.__print__(result)

        return result
    
    def __check__(self, filename):
        file_type = Path(filename).suffix
        if file_type == '.docx':
            _ = Document(filename)
            return CHECK_RESULTS[CHECK_FILE_OK]

        elif file_type == '.rtf':
            with open(filename, 'r') as infile:
                for line_no, line in enumerate(infile):
                    if line_no == 0 and line[:6] != '{\\rtf1':
                        raise Exception('Not RTF')
                    break
            return CHECK_RESULTS[CHECK_FILE_OK]

        else:
            return CHECK_RESULTS[CHECK_FILE_UNKNOWN]
            

In [253]:
class CheckFile(BaseCheckFile): 
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.kwargs = kwargs
        
    def check_one(self, filename):
        
    
        file_type = Path(filename).suffix
    
        if file_type in EXCEL_TYPES:
            result = CheckExcel(**self.kwargs).check_one(filename)
        elif file_type in WORD_TYPES:
            result =  CheckWord(**self.kwargs).check_one(filename)                             
        elif file_type in PDF_TYPES:
            result =  CheckPDF(**self.kwargs).check_one(filename)
        elif file_type in IMAGE_TYPES:
            result =  CheckImage(**self.kwargs).check_one(filename)
        elif file_type in ARCHIVE_TYPES:
            result =  CheckArquive(**self.kwargs).check_one(filename)
        else:
            result = super().check_one(filename)
            if not result:
                result =  (filename, CHECK_RESULTS[CHECK_FILE_UNKNOWN])

        return result

    

In [254]:
from timeit import timeit


check_file = CheckFile(failed_only=True, verbose=True)

r = check_file.check(['test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf', 'test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf', 'test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf','test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf', 'test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf', 'test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf', 'test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf','test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf','test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf', 'test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf', 'test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf','test2.xlsx', 'test2.xls', 'test.pdf', 'corrompido.pdf', 'test4.kml', 'teste.xls', 'test3.doc', 'test3.docx', 'test3.rtf'])



('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')('teste.xls', 'CHECK_FILE_NOT_FOUND')

('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
('corrompido.pdf', 'CHECK_FILE_FAILED')
('teste.xls', 'CHECK_FILE_NOT_FOUND')
