# Prepare pdfs for later in pipeline (Obj Det, Img, text, NER)

- user provides
    - Google Cloud project (input)
    - bucket in GCS of pdfs (input)
    - BQ dataset to write prediction results (output)
        - BQ table: aggregated results (pdf_name, icn_pred, objdet_pred(coords), text_cn, ner1, ner2, ...., ner)
            created with JOIN on pdf_name
        - BQ table: icn_preds (pdf_name, icn_pred)    --> this table is made in icn_predict.ipynb
        - BQ table: objdet_pred (pdf_name, objdet_pred(coords)) --> this table is made in objdet_predict.ipynb
        - BQ table: text_cn (pdf_name, text_cn)    --> this table is made in text_cn_predict.ipynb
        - BQ table: ner (pdf_name, ner1, ner2, ...., ner)
        
- see utils.py for utils functions
        

Steps: 
 1. convert pdf to png and write to bucket (for ICN, ObjDet)
 2. do ocr on pdf and write to bucket 
 3. create dataset 
    

In [1]:
PROJECT = !gcloud config get-value project # returns SList
PROJECT = PROJECT[0] # gets first element in list -> str
REGION = "us-central1"  
MODEL_RESOURCE_NAME = "2393478483993952256"
TCN_ENDPOINT_ID = "3651416543192940544"
ICN_ENDPOINT_ID = "7257673944809865216"

import os
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION

In [2]:
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT)

In [3]:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [4]:
from google.cloud import storage
from google.cloud import vision
from google.cloud import aiplatform
import tempfile
import traceback as tb
from importlib import reload
from pathlib import Path
import pandas as pd
import numpy as np

# for jupyter only
import logging
reload(logging)
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG, datefmt='%I:%M:%S')

In [5]:
from pdf2image import convert_from_path
import io
import base64
import cv2
from datetime import datetime
import time
import json

In [6]:
from google.cloud import aiplatform
from google.cloud.aiplatform.gapic.schema import predict
from google.cloud.aiplatform.v1.schema.predict.instance_v1.types import TextClassificationPredictionInstance, ImageClassificationPredictionInstance
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

In [7]:
logging.info("test if logging works")

01:20:19 INFO:test if logging works


In [8]:
def to_trace_str(e):
    return ''.join(tb.format_exception(None, e, e.__traceback__))

class Utils():
    def __init__(self):
        self.storage_client = storage.Client()
        
    def dismantle_path(self, gcs_path):
        parts = Path(gcs_path).parts
        bucket_idx = 1 if parts[0].startswith("gs") else 0
        filename_idx = -1 if "." in parts[-1] else None

        bucket_name = parts[bucket_idx]
        filename = parts[filename_idx] if filename_idx else ""
        directory = "/".join(parts[bucket_idx+1:filename_idx] if filename_idx else parts[bucket_idx+1:])
        return bucket_name, directory, filename
        
    
    def convert_pdf_to_png(self, src_path, dst_path):
        """Takes pdfs from src_bucket_name and transforms them into png. Then it saves the result in dst_bucket_name"""
        try:
            logging.info("started conversion pdf -> png")
        
            src_bucket_name, src_directory, _ = self.dismantle_path(src_path)
            dst_bucket_name, dst_directory, _ = self.dismantle_path(dst_path)
            
            src_bucket = self.storage_client.bucket(src_bucket_name)
            dst_bucket = self.storage_client.bucket(dst_bucket_name)

            blob_list = [blob for blob in list(src_bucket.list_blobs()) if \
                         os.path.basename(src_directory) in blob.name and \
                         blob.name.endswith(".pdf")]

            encoded_img_lst = []
            png_lst = []
            logging.info(f"found {len(blob_list)} pdfs in bucket  {src_bucket_name}")

            for b_idx, blob in enumerate(blob_list):
                _, tmp_pdf = tempfile.mkstemp()
                blob.download_to_filename(tmp_pdf)
                logging.info(f"downloaded {b_idx+1} of {len(blob_list)} files")
                image = convert_from_path(tmp_pdf)
                logging.info(f"converted {b_idx+1} of {len(blob_list)} images")
                image = image[0]                # Only the firs page is going to be analyzed.
                
                image = np.array(image)
                is_success, im_buf_arr = cv2.imencode(".png", image)
                byte_im = im_buf_arr.tobytes()
                
                filename = os.path.join(dst_directory, blob.name+".png")
                dst_bucket.blob(filename).upload_from_string(byte_im)
                
                png_lst.append({"content": f"gs://{dst_bucket_name}/{filename}", "image": byte_im})
                
                logging.info(f"saved {b_idx+1} of {len(blob_list)} images with filename {filename}")
                
            return png_lst
        
        except Exception as e:
            logging.error(f"Error in method convert_pdf_to_png: {to_trace_str(e)}")
            return False
    
    def ocr(self, src_path, dst_path):
        """Perform optical character recognition in pdf files.
        
        Args
            src_path
            dst_path
        
        Returns
            google.api_core.operation.Operation
            To check if done use method .done()
            
        Link to documentation:  
            https://googleapis.dev/python/vision/latest/vision_v1/types.html#google.cloud.vision_v1.types.OutputConfig
            https://cloud.google.com/vision/docs/pdf
        
        """
        try:
            logging.info("started optical character recognition")
        
            src_bucket_name, src_directory, _ = self.dismantle_path(src_path)
            dst_bucket_name, dst_directory, _ = self.dismantle_path(dst_path)
            
            src_bucket = self.storage_client.bucket(src_bucket_name)
            dst_bucket = self.storage_client.bucket(dst_bucket_name)
            
            logging.info(f"src_bucket_name {src_bucket_name}, src_directory {src_directory}")

            blob_list = [blob for blob in list(src_bucket.list_blobs()) if \
                         os.path.basename(src_directory) in blob.name and \
                         blob.name.endswith(".pdf")]
            
            logging.info(f"found {len(blob_list)} pdf files in bucket {src_bucket_name}")

            client = vision.ImageAnnotatorClient()
            feature = vision.Feature(type_=vision.Feature.Type.DOCUMENT_TEXT_DETECTION)
            
            operations = []
            async_requests = []
            
            for b_idx, blob in enumerate(blob_list):
                gcs_source_uri = "gs://" + os.path.join(src_bucket_name, blob.name)
                gcs_destination_uri = "gs://" +  os.path.join(dst_bucket_name, blob.name)

                # source
                gcs_source = vision.GcsSource(uri=gcs_source_uri)
                input_config = vision.InputConfig(gcs_source=gcs_source, mime_type='application/pdf')

                # destination
                gcs_destination = vision.GcsDestination(uri=gcs_destination_uri)
                output_config = vision.OutputConfig(gcs_destination=gcs_destination, batch_size=1)

                logging.info(f"started ocr for {b_idx} of {len(blob_list)} files")
                async_request = vision.AsyncAnnotateFileRequest(
                    features=[feature], 
                    input_config=input_config,
                    output_config=output_config
                )
                async_requests.append(async_request)

            operation = client.async_batch_annotate_files(requests=async_requests)
            return operation
            
        except Exception as e:
            logging.error(f"Error in method ocr: {to_trace_str(e)}")
            
    def get_extension(self, mime_type):
        if mime_type == "text/plain":
            return ".txt"
        elif mime_type == "image/png":
            return ".png"
        else:
            return ".txt"
        
    def read_text_files(self, list_of_paths):
        bucket_name, _ , _ = self.dismantle_path(list_of_paths[0]["content"])
        bucket = self.storage_client.bucket(bucket_name)
        
        for idx, element in enumerate(list_of_paths):
            path = element["content"]
            _, directory, filename = self.dismantle_path(path)
            text = bucket.blob(os.path.join(directory, filename)).download_as_string()
            list_of_paths[idx]["text"] = text
            
        return list_of_paths    

    
    def read_jsonl(self, gcs_path):
        """create jsonl out of files in bucket
        
        Args
            gcs_path (str): bucket or dir where file is located
            
        Returns
            results (list): list of dicts
        """
        try:
            bucket_name, directory, filename = utils.dismantle_path(gcs_path)
            blob_name = os.path.join(directory, filename)
            logging.info(blob_name)
            bucket = utils.storage_client.bucket(bucket_name)
            blob = bucket.blob(blob_name)
            results = []
            response = blob.download_as_string().decode("utf-8")
            logging.info(response)
            for line in response.split("\n")[:-1]:
                results.append(json.loads(line))
                
            return results

        except Exception as e:
            logging.error(f"Error in read_jsonl: {e}")
        
        
        
        
    def create_jsonl(self, gcs_path, mime_type, filename):
        """create jsonl out of files in bucket
        
        Args
            gcs_path (str): bucket or dir where files are located
            mime_type (str): the files mimetype 
            filename (str): the jsonl filename
        
        Returns
            full path of jsonl
        """
        try:
            filename = os.path.basename(filename)
            bucket_name, directory, _ = self.dismantle_path(gcs_path)
            bucket = self.storage_client.bucket(bucket_name)
            extension = self.get_extension(mime_type)

            blob_list = [blob for blob in list(bucket.list_blobs()) if \
                             os.path.basename(directory) in blob.name and \
                             blob.name.endswith(extension)]

            jsonl_content = ""

            for b_idx, blob in enumerate(blob_list):
                full_path = os.path.join(gcs_path,blob.name)

                d = json.dumps(
                    {
                    "content": full_path,
                    "mimeType": mime_type
                    }
                )+"\n"

                jsonl_content = jsonl_content+d



            file_path = os.path.join(directory, filename)
            bucket.blob(file_path).upload_from_string(jsonl_content)
            logging.info(f"uploaded jsonl {file_path} to bucket {bucket_name}. Full path: gs://{os.path.join(bucket_name,file_path)}")

        
        except Exception as e:
            logging.error(f"Error in jsonl creation: {to_trace_str(e)}")
            
    def create_text_files(self, gcs_path):
        
        results = []
        try:
            # init bucket
            bucket_name, directory, _ = self.dismantle_path(gcs_path)
            bucket = self.storage_client.bucket(bucket_name)
            blob_list = [blob for blob in list(bucket.list_blobs()) if \
                             os.path.basename(directory) in blob.name and \
                             blob.name.endswith("output-1-to-1.json")]
            
            for b_idx, blob in enumerate(blob_list):
                logging.info(f"creating {b_idx+1} of {len(blob_list)} text files")
                json_string = blob.download_as_string()
                response = json.loads(json_string)
                text = response['responses'][0]['fullTextAnnotation']['text'] 
                txt_path = blob.name.replace("output-1-to-1.json", ".txt")
                text_blob = bucket.blob(txt_path)
                results.append({"content":f"gs://{bucket_name}/{txt_path}", "text":text})
                text_blob.upload_from_string(text)
                logging.info(f"created text file gs://{bucket_name}/{txt_path}")
                
            logging.info("finished creating text files")
            return results
            
        except Exception as e:
            logging.error(f"Error in method save_result_as_csv_in_storage: {to_trace_str(e)}") 
    
    
    def save_to_storage(self, gcs_path, filename, predictions):
        """converts list of json into df, saves as temp csv file"""
        try:
            # init bucket
            bucket_name, directory, _ = self.dismantle_path(gcs_path)
            bucket = self.storage_client.bucket(bucket_name)

            # create df
            df = pd.DataFrame.from_records(predictions)

            # save as tmpfile
            _, path = tempfile.mkstemp()
            df.to_csv(path, index=False)

            # create new blob
            blob = bucket.blob(filename)

            # upload csv to blob
            full_path = f"{gcs_path}/{filename}"
            logging.info(f"writing csv {full_path} to storage")
            with open(path, "rb") as my_file:
                blob.upload_from_file(my_file)
                
            return full_path
        
        except Exception as e:
            logging.error(f"Error in method save_result_as_csv_in_storage: {to_trace_str(e)}")  
                         
    def load_to_bigquery(self, gcs_path, dataset_id, table_id, schema):
        """loads csv data in storage to BQ"""
        # Send the dataset to the API for creation, with an explicit timeout.
        # Raises google.api_core.exceptions.Conflict if the Dataset already
        # exists within the project.
        try:
            dataset = bigquery.Dataset(dataset_id)
            dataset.location = "US"
            bq.get_dataset(dataset_id)  # Make an API request.
            logging.info("Dataset {} already exists".format(dataset_id))
        except Exception as e:
            logging.info("Dataset {} is not found".format(dataset_id))
            dataset = bq.create_dataset(dataset, timeout=30)  # Make an API request.
            dataset.location = "US"
            logging.info("Created dataset {}.{}".format(bq.project, dataset.dataset_id))
        finally:
            # create bigquery table and upload csv
            job_config = bigquery.LoadJobConfig(
                schema=schema,
                skip_leading_rows=1,
                # The source format defaults to CSV, so the line below is optional.
                source_format=bigquery.SourceFormat.CSV,
                allow_quoted_newlines=True,

            )
            uri = gcs_path

            load_job = bq.load_table_from_uri(
                uri, table_id, job_config=job_config
            )  # Make an API request.

            load_job.result()  # Waits for the job to complete.

            destination_table = bq.get_table(table_id)  # Make an API request.
            logging.info("Loaded {} rows.".format(destination_table.num_rows))
            
    
    # image
    def predict_online_multiple_image(
        self,
        project: str,
        endpoint_id: str,
        img_lst: list,
        location: str = "us-central1",
        api_endpoint: str = "us-central1-aiplatform.googleapis.com"
    ):
        
        
        # The AI Platform services require regional API endpoints.
        client_options = {"api_endpoint": api_endpoint}
        # Initialize client that will be used to create and send requests.
        # This client only needs to be created once, and can be reused for multiple requests.
        client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)
        
        results = []
        
        for item in img_lst:
            # The format of each instance should conform to the deployed model's prediction input schema.
            encoded_content = base64.b64encode(item["image"]).decode("utf-8")
            instance = predict.instance.ImageClassificationPredictionInstance(
                content=encoded_content,
            ).to_value()
            instances = [instance]
            # See gs://google-cloud-aiplatform/schema/predict/params/image_classification_1.0.0.yaml for the format of the parameters.
            parameters = predict.params.ImageClassificationPredictionParams(
                confidence_threshold=0.5, max_predictions=5,
            ).to_value()
            endpoint = client.endpoint_path(
                project=project, location=location, endpoint=endpoint_id
            )
            response = client.predict(
                endpoint=endpoint, instances=instances, parameters=parameters
            )

            for prediction_ in response.predictions:
                max_value = max(prediction_["confidences"])
                max_index = prediction_["confidences"].index(max_value)
                
                results.append({
                    'file': item["content"], # "gs://bucket/text.txt" TODO: check if original path is needed
                    'subject': prediction_["displayNames"][max_index],
                    'score':  prediction_["confidences"][max_index],
                })
        # See gs://google-cloud-aiplatform/schema/predict/prediction/text_classification.yaml for the format of the predictions.

        
        return results
    # text
    def predict_online_multiple(
        self,
        project: str,
        endpoint_id: str,
        content_lst: list,
        location: str = "us-central1",
        api_endpoint: str = "us-central1-aiplatform.googleapis.com"
    ):
        
        
        aiplatform.init(project=project, location=location)
        endpoint = aiplatform.Endpoint(endpoint_id)
        
        results = []
        
        for item in content_lst:
            response = endpoint.predict(instances=[{"content": item["text"]}], parameters={})

            for prediction_ in response.predictions:
                max_value = max(prediction_["confidences"])
                max_index = prediction_["confidences"].index(max_value)
                
                results.append({
                    'file': item["content"], # "gs://bucket/text.txt" TODO: check if original path is needed
                    'subject': prediction_["displayNames"][max_index],
                    'score':  prediction_["confidences"][max_index],
                })
        # See gs://google-cloud-aiplatform/schema/predict/prediction/text_classification.yaml for the format of the predictions.

        
        return results

In [9]:
class Pipeline():
    def __init__(self, dataset_id=None):
        self.utils = Utils()
        self.uuid = datetime.now().strftime('%y%m%d_%H%M%S') #str
        
        self.project = "qwiklabs-gcp-00-373ac55d0e0a"
        
        self.region = "us-central1"  
        
        
        self.dataset_id = dataset_id if dataset_id else f"{self.project}.docprocessing_"+self.uuid
        
        # find ids via !gcloud ai models list
        self.tcn_model_resource_name = "2393478483993952256"
        self.icn_model_resource_name = "8925034949820547072"
        
        
        self.table_id_tcn = f"{self.dataset_id}.tcn" 
        self.table_id_icn = f"{self.dataset_id}.icn" 
        
        self.tcn_schema = [
                    bigquery.SchemaField("file", "STRING", mode="REQUIRED", description="File path."),
                    bigquery.SchemaField("subject", "STRING", mode="REQUIRED", description="Predicted class."),
                    bigquery.SchemaField("score", "FLOAT", mode="REQUIRED", description="Confidence of the prediction."),
                ]
        
        self.icn_schema = [
                    bigquery.SchemaField("image_name", "STRING", mode="REQUIRED", description='Name of the image analyzed.'),
                    bigquery.SchemaField("label", "STRING", mode="REQUIRED", description='Predicted class. It can be US or EU'),
                    bigquery.SchemaField("confidence", "FLOAT", mode="REQUIRED", description='Confidence of the prediction.'),
                ]
        

    def start_pipeline(self, src_path):
        logging.info(f"started pipeline")
        
        # save everything in the same bucket
        dst_path = src_path
        jsonl_filename_tcn = f"tcn_{self.uuid}.jsonl"
        jsonl_filename_icn = f"icn_{self.uuid}.jsonl"
        
        # create png
        jsonl_path_icn, image_lst_icn  = self.preprocess_pdf_to_png(src_path, dst_path, jsonl_filename_icn)
        
        # create ocr
        jsonl_path_tcn, text_lst_tcn = self.preprocess_ocr(src_path, dst_path, jsonl_filename_tcn)
        
        # prediction
        self.text_classification_task(text_lst=text_lst_tcn)

        self.image_classification_task(img_lst=image_lst_icn)
        
        logging.info(f"finished pipelines")
        
    def preprocess_pdf_to_png(self, src_path, dst_path, jsonl_filename):
        png_lst = self.utils.convert_pdf_to_png(src_path, dst_path)
        
        return self.utils.create_jsonl(gcs_path=dst_path, mime_type="image/png", filename=jsonl_filename), png_lst
    
    def preprocess_ocr(self, src_path, dst_path, jsonl_filename):
        ocr_operation = self.utils.ocr(src_path, dst_path)
        
        while not ocr_operation.done():
            logging.info("wait for ocr to finish")
            time.sleep(5)
            
        text_lst = self.utils.create_text_files(dst_path) # returns list of json
        return self.utils.create_jsonl(gcs_path=dst_path, mime_type="text/plain", filename=jsonl_filename), text_lst
        
        
    def text_classification_task(self, text_lst):

        predictions = self.utils.predict_online_multiple(
            project=PROJECT,
            endpoint_id=TCN_ENDPOINT_ID,
            content_lst=text_lst,
            location="us-central1",
            api_endpoint="us-central1-aiplatform.googleapis.com"
        )

    
        logger.info("save tcn predictions to storage")
        predictions_filename = "predictions_tcn_"+self.uuid+".csv"
        path_to_csv = self.utils.save_to_storage(dst_path, predictions_filename, predictions)

#         # Step 5: Load storage result in BQ
        logger.info("load results into BigQuery")
        status = self.utils.load_to_bigquery(path_to_csv, self.dataset_id, self.table_id_tcn, self.tcn_schema)
        logging.info(f"finished task with status {status}")
    
    def image_classification_task(self, img_lst):
        
        predictions = self.utils.predict_online_multiple_image(
            project=PROJECT,
            endpoint_id=ICN_ENDPOINT_ID,
            img_lst=img_lst,
            location="us-central1",
            api_endpoint="us-central1-aiplatform.googleapis.com"
        )
        
        logger.info("save icn predictions to storage")
        predictions_filename = "predictions_icn_"+self.uuid+".csv"
        path_to_csv = self.utils.save_to_storage(dst_path, predictions_filename, predictions)

        # Step 5: Load storage result in BQ
        logger.info("load results into BigQuery")
        status = self.utils.load_to_bigquery(path_to_csv, self.dataset_id, self.table_id_icn, self.icn_schema)
        logging.info(f"finished task with status {status}")

    
    def odet(self):
        pass
        


# Test

## Parameters

In [10]:
src_path = "gs://2021_08_16_tcn_dev"
dst_path = "gs://2021_08_16_tcn_dev"

pipeline = Pipeline(dataset_id="qwiklabs-gcp-00-373ac55d0e0a.docprocessing_demo_nina")
pipeline.start_pipeline(src_path)

01:20:19 INFO:started pipeline
01:20:19 INFO:started conversion pdf -> png
01:20:19 INFO:found 3 pdfs in bucket  2021_08_16_tcn_dev
01:20:19 INFO:downloaded 1 of 3 files
01:20:19 INFO:converted 1 of 3 images
01:20:19 INFO:saved 1 of 3 images with filename computer_vision_1.pdf.png
01:20:19 INFO:downloaded 2 of 3 files
01:20:20 INFO:converted 2 of 3 images
01:20:20 INFO:saved 2 of 3 images with filename med_tech_8.pdf.png
01:20:20 INFO:downloaded 3 of 3 files
01:20:20 INFO:converted 3 of 3 images
01:20:21 INFO:saved 3 of 3 images with filename us_076.pdf.png
01:20:21 INFO:uploaded jsonl icn_210820_132019.jsonl to bucket 2021_08_16_tcn_dev. Full path: gs://2021_08_16_tcn_dev/icn_210820_132019.jsonl
01:20:21 INFO:started optical character recognition
01:20:21 INFO:src_bucket_name 2021_08_16_tcn_dev, src_directory 
01:20:21 INFO:found 3 pdf files in bucket 2021_08_16_tcn_dev
01:20:21 INFO:started ocr for 0 of 3 files
01:20:21 INFO:started ocr for 1 of 3 files
01:20:21 INFO:started ocr for 

In [11]:
# pipeline.text_classification_task([
#     {"text":"this is an example", "content":"gs://examplepath"},
#     {"text":"this is another example", "content":"gs://examplepath"},
# ])