# **ElasticSearch -1: Load PDFs into ES**

#### This notebook corresponds with the slide "4.1. ES  -  Process  &  Bulk Load"

*------- All files relative paths need to be changed according to your file locations in order to run this notebook -------*

## **1.Install ElasticSearch**
1. **There are many ways to install ElaticSeach locally, for example:**
   * Install Elasticsearch with Docker: https://www.elastic.co/guide/en/elasticsearch/reference/7.16/docker.html 
   * Install Elasticsearch on macOS with Homebrew: https://www.elastic.co/guide/en/elasticsearch/reference/7.16/brew.html#brew
<br>
<br>

2. **We use a method which is fast for testing purpose but not sophisiticated enough for development or production purposes:**
   1. Following this article: https://medium.com/codex/all-you-need-to-know-about-using-elasticsearch-in-python-b9ed00e0fdf0  
   2. Create a directory for ES and open it with VS Code
   3. Install the Docker (by Microsoft) extension in VS Code
   4. Create a "docker-compose.yml" file in the ES directory
   5. Copy and paste the "docker-compose.yml" file content from the article to set up the  basic configurations for ES. Importantly, this file creates a persistent volume locally for Elasticsearch so that data can persist when the containers restart
   6. Open terminal in VS Code and execute command "docker-compose up -d" to create a docker container for ES
   7. Click the VS Code Docker tab, and right-click “elasticsearch:7.12.0” and “kibana:7.12.0” and select "start" for both
   8. Use terminal command "docker-compose ps" or "docker ps -a" to check if this docker container is running, and use "docker-compose logs -f" to check logs of the services
   9. Install ES Python client with Anaconda (https://anaconda.org/conda-forge/elasticsearch) or with Python (https://elasticsearch-py.readthedocs.io/en/v7.16.3/)
   10. Import ES using "from elasticsearch import Elasticsearch", and open Kibana Console in web browser using URL "http://localhost:5601/"
<br>
<br>

3. **ES docs and examples**
   * ES official doc: https://www.elastic.co/guide/index.html 
   * ES API Documentation: https://elasticsearch-py.readthedocs.io/en/7.x/api.html
   * ES Quick Start: https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started.html  
   * ES online book: https://livebook.manning.com/book/elasticsearch-in-action-second-edition/chapter-2/v-4 
   * "What is Elasticsearch and why is it so fast?": https://lynn-kwong.medium.com/what-is-elasticsearch-and-why-is-it-so-fast-5a4b95747d19
   * "All you need to know about using Elasticsearch in Python": https://medium.com/codex/all-you-need-to-know-about-using-elasticsearch-in-python-b9ed00e0fdf0
   * "Streaming structured data from Elasticsearch using Tensorflow-IO": https://colab.research.google.com/github/tensorflow/io/blob/master/docs/tutorials/elasticsearch.ipynb#scrollTo=xHxb-dlhMIzW

## **2. Imports**


In [1]:
import os
import json
import pandas as pd
import numpy as np
from PyPDF2 import PdfFileReader
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
import warnings
warnings.filterwarnings('ignore')

## **3. Load Files and Screen Raw PDFs**


In [11]:
##### 3.1. Load the "companies.xlsx" file to enrich data, if multiple rows share the same "Unique ID",
# then only use the first instance from this file
df_com = pd.read_excel("Crawler & Processing/2.Develop - Crawler Folder/companies.xlsx")
df_com.head()

Unnamed: 0,Unique ID,Issuer - subsidiary,Issuer industry,Country of risk,Date
0,3,Cagayan Electric Power & Light Co Inc,Energy,Philippines,2001-05-30
1,16478,Nanjing Financial City Construction & Developm...,Financials,China,2021-04-30
2,16481,Suzhou Tech City Development Group Co Ltd,Financials,China,2021-04-30
3,16479,Landesbank Baden-Wuerttemberg,Financials,Germany,2021-04-30
4,16480,City of Lunds Sweden,Government,Sweden,2021-04-30


In [2]:
##### 3.2. Load the paths of raw PDFs from the crawler and list out all raw PDF file names from the folders
##### Separately process "positive" and "negative" reports to allow some flexibility for future works
PDF_path_pos = "Crawler & Processing/2.Develop - Crawler Folder/pos_reports/"
PDF_path_neg = "Crawler & Processing/2.Develop - Crawler Folder/neg_reports/"

PDF_file_list_pos = os.listdir(PDF_path_pos)
PDF_file_list_neg = os.listdir(PDF_path_neg)

print("--- The number of positive raw PDF files is: ", len(PDF_file_list_pos))
print("--- The number of negative raw PDF files is: ", len(PDF_file_list_neg))
print("--- The number of ALL raw PDF files is: ", len(PDF_file_list_pos + PDF_file_list_neg))

--- The number of positive raw PDF files is:  995
--- The number of negative raw PDF files is:  1158
--- The number of ALL raw PDF files is:  2153


In [3]:
##### 3.3. Load the results from the BS detector 

# Load the results(CSVs) from the BS Detector into dataframes
df_pos = pd.read_csv("BS Detector Results csv/positives_bsdetector.csv")
df_neg = pd.read_csv("BS Detector Results csv/negatives_bsdetector.csv")

# Select ONLY the sustainability reports (bs_score=2)
df_pos_sus = df_pos[df_pos["bs_score"] == 2]
df_neg_sus = df_neg[df_neg["bs_score"] == 2]
print("--- The number of postitive sustainability reports is: ", df_pos_sus.shape[0])
print("--- The number of negative sustainability reports is: ", df_neg_sus.shape[0])

print("--- The dataframe of positive sustainability results: ")
display(df_pos_sus.head())
print("--- The dataframe of negative sustainability results: ")
display(df_neg_sus.head())

--- The number of postitive sustainability reports is:  456
--- The number of negative sustainability reports is:  520
--- The dataframe of positive sustainability results: 


Unnamed: 0.1,Unnamed: 0,Company,Pages,Sustainability Report mention,first5sus,Annual Report mention,first5annual,exclusion,Year,bs_score,sustainability_score
0,0,86,6,1,1,0,0,1,2021,2,0
1,1,16357,387,1,1,1,1,0,2018,2,0
2,2,16447,16,1,1,0,0,1,2021,2,0
3,3,16423,133,1,1,1,0,0,2020,2,0
4,4,16456,72,1,1,1,0,0,2019,2,0


--- The dataframe of negative sustainability results: 


Unnamed: 0.1,Unnamed: 0,Company,Pages,Sustainability Report mention,first5sus,Annual Report mention,first5annual,exclusion,Year,bs_score,sustainability_score,Searched Year,Matching Year
0,0,3982_2020,77,1,1,1,0,0,2019,2,0,2020,False
1,1,15206_2020,285,1,1,1,0,0,2020,2,0,2020,True
6,6,15194_2020,52,1,1,1,0,0,2020,2,0,2020,True
7,7,3975_2020,156,1,1,1,1,0,2020,2,0,2020,True
8,8,15196_2020,110,1,1,1,0,1,2020,2,0,2020,True


In [4]:
##### 3.4. Define a function to screen out unreadable PDFs by PdfFileReader
def check_readable_files(df_sus, file_path, label):
    """[summary]
    This function iterates through all raw PDFs, reads them and counts the pages, and puts results into  
    "readable_fileID_list" and "page_nr_list" to screen out unreadable PDFs by PdfFileReader
    
    Args:
        df_sus ([dataframe]): dataframe from the BS detector as loaded 1 cell above
        file_path ([string]): paths of raw PDFs 
        label ([string]): "positive" or "negative"

    Returns:
        readable_fileID_list ([list]): a list of readable file names (str)
        page_nr_list ([list]): a list of total page number of each readable file (int)
    """
    
    # Check total page number of all sustainability reports using BS detector result dataframe
    df_page_nr = df_sus["Pages"].sum()

    # Load total page number of all raw PDF files 
    readable_fileID_list = []
    page_nr_list = []
    
    # Go through each raw PDF and append each file name and total page number to the lists above
    for pdf_ID in df_sus["Company"].values:
        try:
            fileName = str(pdf_ID)+".pdf"
            if os.path.isfile(os.path.join(file_path, fileName)):
                with open(file_path + fileName, 'rb') as f:
                    pdf = PdfFileReader(f)
                    page_nr = pdf.getNumPages()
                    page_nr_list.append(page_nr)
                    readable_fileID_list.append(fileName)
        except:
                continue
    
    # Print comparison information
    print("--- Total number of {} sustainability reports from the BS Detector dataframe: {}".format(label, df_sus.shape[0]))
    print("--- Total page number of {} reports from the BS Detector dataframe: {}".format(label, df_page_nr))
    print("--- Total READABLE number of {} raw PDFs: {}".format(label, len(readable_fileID_list)))
    print("--- Total READABLE page number of {} raw PDFs: {}".format(label, sum(page_nr_list)))
    print("--- The number of reports dropped due to readability issue: ", df_sus.shape[0] - len(readable_fileID_list))
    print("--- The number of pages dropped due to readability issue: ", df_page_nr - sum(page_nr_list))
    
    # Return READABLE file name list and page number list for further use
    return readable_fileID_list, page_nr_list

In [5]:
##### 3.5. Use the function above to screen readable positive PDFs by PdfFileReader
# and get 2 variables to be used for bulk processing data for ES later
readable_fileID_list_pos, page_nr_list_pos = check_readable_files(df_sus = df_pos_sus,
                                                                  file_path = PDF_path_pos,
                                                                  label = "positive")
#【Time of running this cell: 1min】

--- Total number of positive sustainability reports from the BS Detector dataframe: 456
--- Total page number of positive reports from the BS Detector dataframe: 48158
--- Total READABLE number of positive raw PDFs: 432
--- Total READABLE page number of positive raw PDFs: 45430
--- The number of reports dropped due to readability issue:  24
--- The number of pages dropped due to readability issue:  2728


In [6]:
##### 3.6. Use the function above to screen readable negative PDFs by PdfFileReader
# and get 2 variables to be used for bulk processing data for ES later
readable_fileID_list_neg, page_nr_list_neg = check_readable_files(df_sus = df_neg_sus,
                                                                  file_path = PDF_path_neg,
                                                                  label = "negative")
#【Time of running this cell: 1min】

--- Total number of negative sustainability reports from the BS Detector dataframe: 520
--- Total page number of negative reports from the BS Detector dataframe: 55649
--- Total READABLE number of negative raw PDFs: 492
--- Total READABLE page number of negative raw PDFs: 51608
--- The number of reports dropped due to readability issue:  28
--- The number of pages dropped due to readability issue:  4041


## **4. Prepare Text Embedding and Functions for ES Bulk Loading**


#### 4.1. Download the USE Encoder to Generate Text Embedding Vectors for Semantic Search

The Universal Sentence Encoder encodes text into high dimensional vectors that can be used for text classification, semantic similarity and clustering.  USE has two variations i.e. one trained with Transformer encoder and other trained with Deep Averaging Network (DAN). We use Transformer version, since it generally has higher accuracy. We create vector matrices of text embeddings of our reference data, then we get the vectors of the embeddings of text from each report page and then compare the similarity scores. USE is element-wise sum of context-aware word representations, like BERT, but USE is particularly trained to identify the semantic similarity between sentences.

* Useful Links:
    * TensorFlow Hub: https://tfhub.dev/
    * universal-sentence-encoder: https://tfhub.dev/google/collections/universal-sentence-encoder/1
    * "Review-based Search Engine using Universal Sentence Encoder and PEGASUS": https://medium.com/@peacelikejoy/review-based-search-engine-using-universal-sentence-encoder-and-pegasus-3643d6456b9f 
    * "Vector-Based Semantic Search using Elasticsearch": https://medium.com/version-1/vector-based-semantic-search-using-elasticsearch-48d7167b38f5

In [9]:
# Import the essential TensorFlow libraries：
import tensorflow.compat.v1 as tf 
import tensorflow_hub as hub

# Download the Universal Sentence Encoder Model (about 1 GB)：
# Since TF ver.1 is used here, pretrained TF embedding modules need to be loaded with "hub.Module"
# With TF ver.2, "hub.load()" is used instead to handle valid SavedModel, instead of modules
# About USE: Universal-Sentence-Encoder https://tfhub.dev/google/universal-sentence-encoder/4 

graph = tf.Graph()

with tf.Session(graph = graph) as session:
    print("--- Downloading pre-trained embeddings from tensorflow hub…")
    embed = hub.Module("https://tfhub.dev/google/universal-sentence-encoder/2") 
    text_ph = tf.placeholder(tf.string)
    embeddings_1 = embed(text_ph)
    print("--- Done.")
    print("--- Creating tensorflow session…")
    
    session = tf.Session()
    session.run(tf.global_variables_initializer())
    session.run(tf.tables_initializer())
    print("--- Done.")

#【Time of running this cell the first time: 20min】

--- Downloading pre-trained embeddings from tensorflow hub…
INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


--- Done.
--- Creating tensorflow session…
--- Done.


In [10]:
# Define a function to use the USE encoder to convert texts to embedding vectors:
def text_to_vector(text):
    vectors = session.run(embeddings_1, feed_dict={text_ph: text})
    return [vector.tolist() for vector in vectors]

# Define a testing reference text and call the "text_to_vector" function to convert：
testing_text = "We show our efforts to help the green economy, creating business and value by recycling plastic waste. We focus on the positive impact on the environment and people through further growing our sustainable offering. We create technologies and solutions to advance a more efficient, sustainable, resilient and environment-friendly world for all. We perform detailed analysis to evaluate the significance of working activities that influence the environment. Our Environmental policy is also defined in the engineering phase which is an opportunity to propose technological modifications which can result in energy saving and cleaner emissions, leading to environmental benefits for the customer, stakeholders and the whole community. We are using only renewable energy. All our electricity is from renewable sources. Our electricity mainly come from solar panels and wind power. We demonstrate our commitment to this policy by striving to ensure that our actions have no or minimal impact on our planet. We have reduced our green house gas emissions. We are committed to promote decarbonization and better use of energy, continuously implement energy efficiency initiatives. Water consumption has been reduced and water has been recycled with innovative technologies.  It’s essential to protect water, not only for our business needs, but also for the sake of the communities in which we operate, because access to clean, fresh water is a critical human need. We also implemented a comprehensive water management system that includes a rainwater harvesting system. We have undertaken careful and comprehensive collection, transportation and final treatment of waste. Our digitalization of documents assists a paper-less approach which helps to reduce paper waste. We have successfully used innovative technologies to minimize hazard wastes. Negative impact on the environment has been reduced. During each audit we inspect environmental permits, waste management, and effluent treatment plants. We began an office eco-efficiency program aimed at reduction, reuse and recycling of waste. Each office has designated recycling bins. We have eliminated plastic from our packaging. We also committed to a plastic-free future. We have reduced carbon (CO2 ) emissions and reduced our carbon footprint of our operations, products and services. We achieved net-zero operational emissions. Circularity is part of our business model and we are expanding our environmental commitments to integrate biodiversity. We have started a series of initiatives to protect animal and plants biodiversity. We have been actively source sustainable green materials during our production."
text_vector = text_to_vector([testing_text])[0]
print("--- Text to be embedded: {}".format(testing_text), "\n")
print("--- Embedding size: {}".format(len(text_vector)), "\n")
print("--- Obtained Embedding[{},…]\n".format(text_vector[:5]))

#【Time of running this cell the first time: 1sec】

--- Text to be embedded: We show our efforts to help the green economy, creating business and value by recycling plastic waste. We focus on the positive impact on the environment and people through further growing our sustainable offering. We create technologies and solutions to advance a more efficient, sustainable, resilient and environment-friendly world for all. We perform detailed analysis to evaluate the significance of working activities that influence the environment. Our Environmental policy is also defined in the engineering phase which is an opportunity to propose technological modifications which can result in energy saving and cleaner emissions, leading to environmental benefits for the customer, stakeholders and the whole community. We are using only renewable energy. All our electricity is from renewable sources. Our electricity mainly come from solar panels and wind power. We demonstrate our commitment to this policy by striving to ensure that our actions have no or min

#### 4.2. Load ES Client and Define ES Configurations

In [12]:
# Create an ES client for ES operations
es_client = Elasticsearch("localhost:9200",  # Host here is a single node. Most of the time we only connect to a single node for testing purpose
                          http_auth=["elastic", "ING_project"],  # Default port, user name, and self-defined password
                          timeout=300)  #【!】Set "timeout" parameter to allow longer data loading/indexing time

# Create an ES index client to work with indexes
es_index_client = IndicesClient(es_client)

In [13]:
# Define the Settings & Mappings of ES [[[[[-------IMPORTANT!--------]]]]]
# In practical usage, WE always define settings and mappings which can make ES engine more robust, efficient and powerful
configurations = {
    "settings": {  # Setting part of the index
        "index": {
            "number_of_replicas": 1}, # Make no difference in a local environment, but in production multiple replicas 
                                      # can improve availability and fault tolerance
        "max_result_window" : 100000,  #【!】Set larger than our total doc/page number, which is about 96000+, otherwise python
                                       # won't be able to retrive all results > then default size 10,000
        "analysis": { # Define an ngram filter and analyzer here which supports searching by partial input or autocompletion
            "filter": {
                "ngram_filter": {
                  "type": "edge_ngram",
                  "min_gram": 2,
                  "max_gram": 512}
            },
            "analyzer": {
                "ngram_analyzer": { # Define parameters for the ngram_analyzer analyzer 
                                    # "An Introduction to Ngrams in Elasticsearch": https://qbox.io/blog/an-introduction-to-ngrams-in-elasticsearch
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": [
                      "lowercase",
                      "ngram_filter"]
                }  
            }
        }
    },
    "mappings": { # Data schema of the index. ES supports dynamic mapping, which means we don’t need to define the field types 
                  # in advance and Elasticsearch will create them automatically. However, we should always define the mapping whenever possible. 
                  # It is better to be explicit about the mapping than implicit. The more you know about your data, the more robust the ES engine can be.
        "dynamic": "true",
        "_source": {"enabled": "true"},
        "properties": {
            "id": {
                "type": "text", # Set doc id to be strings to include both "doc id" and "page number", like "86.1"
                "fields": {
                    "keyword": {
                        "type": "keyword"}
                }
            },
            "label": {
                "type": "long", # Positive label = 1, Negative label = 0
                "fields": {
                    "keyword": {
                        "type": "keyword"}
                }
            },
            "company": {
                "type": "text",
                "fields": {
                    "keyword": { # Add "keyword" field to allow aggregation and advanced seach
                        "type": "keyword"} 
                }
            },
            "industry": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword"}
                }
            },
            "country": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword"}
                }
            },
            "date": {  # Set the date to be numerical type to allow range search
                "type": "long",
                "fields": {
                    "keyword": {
                        "type": "keyword"}
                }
            },
            "filename": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword"},
                    "ngrams": {
                        "type": "text",
                        "analyzer": "ngram_analyzer"}
                }
            },
            "page": { # This page numebr = origiral/actual page number - 1 due to PdfFileReader's non-zero indexing
                "type": "long",
                "fields": {
                    "keyword": {
                        "type": "keyword"}
                }
            },
            "text_len": { # Number of words contained in the "text" field of this doc/page
                "type": "long",
                "fields": {
                    "keyword": {
                        "type": "keyword"}
                }
            },
            "text": {
                "type": "text",
                "analyzer": "standard",
                "fields": {
                    "keyword": {
                        "type": "keyword"},
                    "ngrams": { # Allow this field to use the ngram_analyzer analyzer
                        "type": "text",
                        "analyzer": "ngram_analyzer"}
                }
            },
            "emb_text_vector": { 
                "type": "dense_vector",
                "dims": 512  #【!】Needs to be the same as the dimenstion of USE-encoded text embedding vectors
            }
        }
    }
}

#### 4.3. Define a Function to Extract Texts from PDF pages with Enriched data for ES Bulk Load

In [14]:
# Define a function to read pages from each PDF into action_lists 
def pdf_page_to_action(file_list, file_path, start, end, index_name, label, json_name):
    """
    [summary]
    This function iterates through each raw PDF from the given file_path, extracts the file name to get the PDF id,
    splits files into small batches according to the "start" and "end" index to avoid ES data overload,
    uses the PDF id to match the company information, extracts texts from PDFs, creates indexing actions using 
    the index_name, creates ES doc ids using "PDF id + page number" strings, creates "docs" as the main data
    bodies that contain {id, company info, file and page info, texts and text embedding vectors}, then appends the action
    and the corresponding doc body into the action_list, and finally generates json files using each action_list
    for backup purpose
    
    Args:
        file_list ([list]): the previously generated readable PDF lists, such as "readable_fileID_list_pos"
        file_path ([string]): the raw PDF file path defined in previous cell
        start ([int]): start index of the small batch
        end ([int]): end index of the small batch
        index_name ([string]): index name for ES indexing
        label ([bool number]): positive=1, negative=0
        json_name ([string]): json file name

    Returns:
        action_list ([string]): a list of "action & doc body" pairs for ES bulk API to load data into ES
    """
    
    action_list = []  

    # Get file names from the file_list indexed by "start" and "end" points
    for fileName in file_list[start : end]:
        try:
            # Get PDF id before ".csv" for positive PDFs and before "_year.csv" for negative PDFs
            if label== 1: 
                pdf_ID = fileName.split(".")[0] 
            else:
                pdf_ID = fileName.split("_")[0]
                
            # Match company info and only use the first instance from this df_com from previous cell
            matched_row = df_com[df_com["Unique ID"]== int(pdf_ID)].iloc[0] 
            
            # Open raw PDFs and extract text with enriched data, all mapped according to ES mappings
            with open(file_path + fileName, 'rb') as f:
                pdf = PdfFileReader(f)
                for pn in range(1, pdf.getNumPages()): # Get all pages of a PDF, has to start from "1"
                    doc_id_str = str(pdf_ID)+"."+str(pn) # Create ES doc id = "PDF id + page number" string
                    page = pdf.getPage(pn)   # Retrieves a page by number from this PDF file
                    text = page.extractText()   # Extract the text on this page
                    text_vector = text_to_vector([text])[0] # Call the function to convert text into text embedding vectors
                    
                    # Create an action for each page with an unique ES doc id
                    action = {"index": {"_index": index_name, "_id": doc_id_str}}
                    # Create an data body in a doc Dict corresponding to each page/doc
                    doc = { 
                        "id": doc_id_str,
                        "label": label,
                        "company": matched_row["Issuer - subsidiary"],
                        "industry": matched_row["Issuer industry"],
                        "country": matched_row["Country of risk"],
                        "date": int(str(matched_row["Date"]).split("-")[0]), 
                        "filename": fileName,
                        "page": pn,
                        "text_len":len(text.split(" ")), 
                        "text": text,
                        "emb_text_vector": text_vector
                    }   
                    action_list.append(json.dumps(action))  
                    action_list.append(json.dumps(doc)) 
            
        except:
            continue
        
    # Feed "action_list" into a JSON File for convenient access and as backups
    with open("{}.json".format(json_name), "w") as write_file:
        write_file.write("\n".join(action_list))
        
    return action_list

## **5. Create Action Lists and Bulk Load Data into ES**

In [15]:
##### 5.1. Call the "pdf_page_to_action" function and create action lists for all pages from all raw PDFs
# Define placeholders Dicts and the initial values ("start" and "end") used to split PDFs into batches
action_dict_POS = {}
action_dict_NEG = {}
start = 0
interval = 10 # Determins the number of PDFs in each batch
counter = 1

# Processing positive reports into batches
print("--- Processing positive reports ---")
while start < len(readable_fileID_list_pos):
    # Define the end point for each interation
    if start + interval > len(readable_fileID_list_pos):
        end = len(readable_fileID_list_pos) 
    else:
        end = start + interval
    # Create an action_list using the function defined above
    action_list = pdf_page_to_action(file_list=readable_fileID_list_pos, # "readable_fileID_list_pos" defined ealier
                                     file_path=PDF_path_pos, # "PDF_path_pos" defined ealier
                                     start=start, # changes in every iteration
                                     end=end, # changes in every iteration
                                     label=1, # positive label = 1
                                     index_name="sus_reports_" + str(counter), # dynamically generate index names using the counter
                                     json_name="sus_reports_" + str(counter) 
                                     )
    # Append this action_list into action_dict_POS Dict
    key = "action_list_" + str(counter)
    action_dict_POS[key] = action_list
    print("action_list_" + str(counter), " is created for sus_reports_" + str(counter))
    # Increase the "start" number by "interval", an increase "counter" by 1
    start += interval
    counter += 1

# Reset the splitters but use the last "counter" value
# 【!】to ensure the index name has a continuous "sus_reports_*" pattern for bulk searching later!!!!!
start = 0
interval = 10
# Processing negative reports into batches, same logic
print("--- Processing negative reports ---")
while start < len(readable_fileID_list_neg):
    if start + interval > len(readable_fileID_list_neg):
        end = len(readable_fileID_list_neg) 
    else:
        end = start + interval
    action_list = pdf_page_to_action(file_list=readable_fileID_list_neg, 
                                     file_path=PDF_path_neg, 
                                     start=start, 
                                     end=end, 
                                     label=0, 
                                     index_name="sus_reports_" + str(counter), 
                                     json_name="sus_reports_" + str(counter)
                                     )
    key = "action_list_" + str(counter)
    action_dict_NEG[key] = action_list
    print("action_list_" + str(counter), " is created for sus_reports_" + str(counter))
    start += interval
    counter += 1
    
# Print out ending message with total number of action lists (AKA: indexes)
print("--- Done! {} action lists are created in total! ---".format(len(action_dict_POS)+len(action_dict_NEG)))

#【Time of running this cell: 40min for pos files + 45min for neg file = 80~90min】

--- Processing positive reports ---
action_list_1  is created for sus_reports_1
action_list_2  is created for sus_reports_2
action_list_3  is created for sus_reports_3
action_list_4  is created for sus_reports_4
action_list_5  is created for sus_reports_5
action_list_6  is created for sus_reports_6
action_list_7  is created for sus_reports_7
action_list_8  is created for sus_reports_8
action_list_9  is created for sus_reports_9
action_list_10  is created for sus_reports_10
action_list_11  is created for sus_reports_11
action_list_12  is created for sus_reports_12
action_list_13  is created for sus_reports_13
action_list_14  is created for sus_reports_14
action_list_15  is created for sus_reports_15
action_list_16  is created for sus_reports_16
action_list_17  is created for sus_reports_17
action_list_18  is created for sus_reports_18
action_list_19  is created for sus_reports_19
action_list_20  is created for sus_reports_20
action_list_21  is created for sus_reports_21
action_list_22  

In [17]:
##### 5.2. Bulk load all action lists into ES
#【!】Iterate through each action list as separate indexes and load data from "doc" body into ES as separate docs 
# to keep each index's data quantity below ES data size limit

# Iterate through action_dicts
for action_dict in [action_dict_POS, action_dict_NEG]:
    # Iterate through each key-value pair in each action_dict
    for key, value in action_dict.items():
        # Create index names from action lists
        index_name = "sus_reports_"+key.split("_")[-1] 
        # If an index already exists in ES then delete it 
        if es_client.indices.exists(index=index_name):  
            es_client.indices.delete(index=index_name, ignore=[400, 404])
        # Create a new index
        es_index_client.create(index=index_name, 
                               settings=configurations["settings"], 
                               mappings=configurations["mappings"], 
                               request_timeout=1000) # 【!】Set a large number to avoid ES "Request Timeout" problems
        # Bulk load data from this action list to the corresponding index in ES
        es_client.bulk(body="\n".join(value)) 

#【Time of running this cell: 1~2min !!!】

In [None]:
#### 5.3. Search for all docs to check the total number of docs/pages loaded into ES
search_query = {
    "size": 100000, 
    "query": {
        "match_all": {}
        },
    "_source": ["id","label", "company", "industry", "country", "date","filename", "page", "text_len"] 
    # Need to specify a few fields to limit the size of data retrieved to avoid ES crash
}

# Get search results and show basic stats
result = es_client.search(index="sus_reports_*", body=search_query, request_timeout=1000)
Total_index_count = result["_shards"]["total"]
print("--- Total number of indexes retrived: ", Total_index_count)

Total_page_count = len(result["hits"]["hits"])
print("--- Total number of docs/pages retrived: ", Total_page_count)

Total_word_count = 0
for dict in result["hits"]["hits"]:
    text_len = dict["_source"]["text_len"]
    Total_word_count += text_len
print("--- Total words stored in ES: ", Total_word_count)


  result = es_client.search(index="sus_reports_*", body=search_query, request_timeout=1000)


--- Total number of indexes retrived:  94
--- Total number of docs/pages retrived:  93950
--- Total words stored in ES:  29215337
