In [None]:
import pandas as pd
import numpy as np
import re
import os
import glob
import string
import pickle
import json
import traceback
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from google.cloud import storage
from google.cloud import vision
from google.protobuf import json_format
from PyPDF2 import PdfFileWriter, PdfFileReader



def clean_text(text):
    
    text = str(text)
    #remove numbers and lowercase
    text = re.sub(r'\d+', '', text.lower())
    #remove punctuation
    text = ''.join(c for c in text if c not in string.punctuation)
    #remove extra spaces
    text = ' '.join(text.split())
    return text


def async_detect_document(gcs_source_uri, gcs_destination_uri):
    """OCR with PDF/TIFF as source files on GCS"""
    # Supported mime_types are: 'application/pdf' and 'image/tiff'
    mime_type = 'application/pdf'

    # How many pages should be grouped into each json output file.
    batch_size = 2

    client = vision.ImageAnnotatorClient()

    feature = vision.types.Feature(
        type=vision.enums.Feature.Type.DOCUMENT_TEXT_DETECTION)

    gcs_source = vision.types.GcsSource(uri=gcs_source_uri)
    input_config = vision.types.InputConfig(
        gcs_source=gcs_source, mime_type=mime_type)

    gcs_destination = vision.types.GcsDestination(uri=gcs_destination_uri)
    output_config = vision.types.OutputConfig(
        gcs_destination=gcs_destination, batch_size=batch_size)

    async_request = vision.types.AsyncAnnotateFileRequest(
        features=[feature], input_config=input_config,
        output_config=output_config)

    operation = client.async_batch_annotate_files(
        requests=[async_request])

    print('Waiting for the operation to finish.')
    operation.result(timeout=420)

    # Once the request has completed and the output has been
    # written to GCS, we can list all the output files.
    storage_client = storage.Client()

    match = re.match(r'gs://([^/]+)/(.+)', gcs_destination_uri)
    bucket_name = match.group(1)
    prefix = match.group(2)

    bucket = storage_client.get_bucket(bucket_name)

    # List objects with the given prefix.
    blob_list = list(bucket.list_blobs(prefix=prefix))
    res = []
#     print('Output files:')
    for blob in blob_list:
        print(blob.name)

        # Process the first output file from GCS.
        # Since we specified batch_size=2, the first response contains
        # the first two pages of the input file.
        output = blob

        json_string = output.download_as_string()
        response = json_format.Parse(
            json_string, vision.types.AnnotateFileResponse())

        # The actual response for the first page of the input file.
        for resp in response.responses:
            annotation = resp.full_text_annotation

        # Here we print the full text from the first page.
        # The response contains more information:
        # annotation/pages/blocks/paragraphs/words/symbols
        # including confidence scores and bounding boxes
    #     print(u'Full text:\n{}'.format(
    #         annotation.text))
            res.append(annotation.text)
        
    return res






def get_text(path):
    
    bucket, file_name = path.split('/', 1)
    bucket_path = "gs://"+ bucket + "/" + file_name
#     print(bucket_path)
    data = []
    data.append(async_detect_document(bucket_path, 'gs://s80-dochub-ocr-test-bucket-03/jsons/' + file_name.split('.')[0] + '/'))
    return data



def split_pages(path):

    try:
    
        client = storage.Client()

        bucket_name = path.split('/', 1)[0]
        prefix = path.split('/', 1)[1]

        file_name = prefix.split('/')[-1].split('.')[0]
#         regex = re.compile('[^0-9a-zA-Z]')
#         file_name = regex.sub('', file_name)
#         print('file name: ', file_name)

        bucket = client.bucket(bucket_name)

        blob = bucket.blob(prefix)

        if(not os.path.exists('/tmp/' + 'files')):
            os.makedirs('/tmp/' + 'files',exist_ok=True)

        blob.download_to_filename('/tmp/files/{}.pdf'.format(file_name))

        pdf_document = "/tmp/files/{}.pdf".format(file_name)
        pdf = PdfFileReader(pdf_document)

        files_list = []
        upload_bucket = client.bucket('s80-dochub-ocr-test-bucket-03')

        for page in range(pdf.getNumPages()):
            pdf_writer = PdfFileWriter()
            current_page = pdf.getPage(page)
            pdf_writer.addPage(current_page)

            outputFilename = "/tmp/files/{}_{}.pdf".format(file_name, page + 1)
            files_list.append('s80-dochub-ocr-test-bucket-03/' + "files/{}_{}.pdf".format(file_name, page + 1))
            with open(outputFilename, "wb") as out:
                pdf_writer.write(out)

            upload_blob = upload_bucket.blob("files/{}_{}.pdf".format(file_name, page + 1))
            upload_blob.upload_from_filename(outputFilename)

#             print("created", outputFilename)
#             print(files_list)
        
        files = glob.glob('/tmp/files/*')
        for f in files:
            os.remove(f)
        return files_list

    except Exception as e:
        print('Exception :', e)
        print(traceback.format_exc())




def main(path):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    try:
#         file = event
#         print(f"Processing file: {file['name']}.")

#         bucket = file['bucket']
#         input_pdf = file['name']

#         path = uri.split('//', 1)[1]

        files_to_be_processed = split_pages(path)
        text_list = []

        for file_path in files_to_be_processed:
            text_list.extend(get_text(file_path))
        
        return text_list
#             df = pd.DataFrame(df, columns=['text'])
#             df['text'] = df['text'].apply(clean_text)
#             get_inference(df, file_path)
    
    except Exception as e:
        print('Exception :', e)
        print(traceback.format_exc())

In [None]:
client = storage.Client()
bucket = client.bucket('s80-dochub-ocr-bucket-2')
folder_name='bank_statements'
blobs = bucket.list_blobs(prefix=folder_name+'/')
blobs_list = list(blobs)
len(blobs_list)

In [None]:
import multiprocessing
from joblib import Parallel, delayed
from tqdm import tqdm

In [None]:
num_cores = multiprocessing.cpu_count()

In [None]:
inputs = tqdm(blobs_list[1:])
processed_list = []
if __name__ == "__main__":
    processed_list.extend(Parallel(n_jobs=num_cores)(delayed(main)('s80-dochub-ocr-bucket-2/'+i.name) for i in inputs))