In [None]:
import os
import requests
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, generate_blob_sas, BlobSasPermissions
from dotenv import load_dotenv
import pdfplumber
import pandas as pd
from datetime import datetime, timedelta
import pyarrow as pa
import pyarrow.parquet as pq

In [17]:
from langchain import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

In [None]:
import sagemaker
from sagemaker import get_execution_role
import boto3
import pandas as pd
import re
from transformers import pipeline, AutoTokenizer
from textblob import TextBlob

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from collections import Counter

In [None]:
# Load environment variables from .env file (you can use python-dotenv package for this)
load_dotenv()

In [None]:
# Retrieve the connection string from the environment variable
connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
sas_token = os.getenv('AZURE_STORAGE_SAS_TOKEN')  # Ensure your SAS token is set in .env

if connect_str is None:
    raise ValueError("Azure Storage Connection String is not set in the environment variables")

print(f"Connection String: {connect_str}")  # Print the connection string to verify
print(f"SAS Token: {sas_token}")

In [None]:
# Azure Blob containers
pdf_container_name = 'pcaob'
text_container_name = 'pcaobtext'

In [None]:
# Ensure the file path is correct
pdf_links_df = pd.read_csv('data/PCAOB_inspection_reports.csv', usecols=['PDF Link'])

In [None]:
# Check if the DataFrame is loaded correctly
pdf_links_df.head()

In [None]:
pdf_links_df

In [None]:
# Initialize Blob Service Client
try:
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    pdf_container_client = blob_service_client.get_container_client(pdf_container_name)
    text_container_client = blob_service_client.get_container_client(text_container_name)
    
    pdf_container_client.create_container()
    text_container_client.create_container()
    print("Containers created successfully")
except Exception as e:
    print(f"Error initializing BlobServiceClient: {e}")

def download_pdf(url):
    response = requests.get(url)
    if response.status_code == 200:
        file_name = url.split('/documents/')[-1].split('?')[0]
        if not file_name.endswith('.pdf'):
            file_name += '.pdf'
        with open(file_name, 'wb') as f:
            f.write(response.content)
        return file_name
    else:
        print(f"Failed to download {url}, status code: {response.status_code}")
        return None

def extract_text_from_pdf(file_name):
    try:
        with pdfplumber.open(file_name) as pdf:
            text = ''
            for page in pdf.pages:
                text += page.extract_text() + '\n'
        return text
    except Exception as e:
        print(f"Error extracting text from {file_name}: {e}")
        return None

def upload_pdf_to_azure(file_name):
    blob_client = blob_service_client.get_blob_client(container=pdf_container_name, blob=file_name)
    with open(file_name, 'rb') as data:
        blob_client.upload_blob(data, overwrite=True)
    return f"https://{blob_service_client.account_name}.blob.core.windows.net/{pdf_container_name}/{file_name}{sas_token}"

def upload_parquet_to_azure(file_name):
    blob_client = blob_service_client.get_blob_client(container=text_container_name, blob=file_name)
    with open(file_name, 'rb') as data:
        blob_client.upload_blob(data, overwrite=True)
    print(f"Uploaded Parquet file to Azure Blob Storage: {file_name}")

In [None]:
# Get the PDF links
pdf_links = pdf_links_df['PDF Link'].tolist()

In [None]:
# Directory for storing temporary PDF files
temp_dir = 'temp_pdf_files'
os.makedirs(temp_dir, exist_ok=True)

In [None]:
# List to store extracted text
extracted_data = []

In [None]:
# Process each PDF link
for url in pdf_links:
    try:
        # Download the PDF
        file_name = download_pdf(url)
        if file_name:
            # Move the file to the temporary directory
            local_file = os.path.join(temp_dir, file_name)
            os.rename(file_name, local_file)

            # Extract text from the PDF
            text = extract_text_from_pdf(local_file)
            extracted_data.append({'pdf_link': url, 'extracted_text': text})

            # Upload the PDF to Azure Blob Storage
            upload_pdf_to_azure(local_file)

            # Remove the local file after processing
            os.remove(local_file)
    except Exception as e:
        print(f"Error processing {url}: {e}")
        extracted_data.append({'pdf_link': url, 'extracted_text': None})

In [None]:
# Convert the extracted text to a DataFrame
extracted_df = pd.DataFrame(extracted_data)

In [None]:
# Save the DataFrame to a Parquet file
parquet_file = 'extracted_text.parquet'
table = pa.Table.from_pandas(extracted_df)
pq.write_table(table, parquet_file)

In [None]:
# Upload the Parquet file to Azure Blob Storage
upload_parquet_to_azure(parquet_file)

In [None]:
# Remove the temporary directory if empty
if not os.listdir(temp_dir):
    os.rmdir(temp_dir)

In [None]:
parquest_df = pd.read_parquet('extracted_text.parquet', engine='pyarrow')
parquest_df

In [None]:
#Check Text Extracted
parquest_df['extracted_text'][0]

In [None]:
df = pd.read_csv('data/PCAOB_inspection_reports.csv')
df

In [None]:
complete_df = pd.merge(parquest_df, df, left_on='pdf_link', right_on='PDF Link', how='left')
complete_df

In [None]:
# Save the DataFrame to a Parquet file
parquet_adjusted_file = 'extracted_complete_text.parquet'
table2 = pa.Table.from_pandas(complete_df)
pq.write_table(table2, parquet_adjusted_file)

### Sentimal Analysis & Summarization

In [2]:
# Set up the SageMaker environment
role = 'arn:aws:iam::533267375984:role/neo_mathematician'
sagemaker_session = sagemaker.Session()
bucket = 'pcaob-data'
prefix = 'inspection_report'
region = 'us-east-1'

print(f"Using bucket {bucket} in region {sagemaker_session.boto_region_name}")

Using bucket pcaob-data in region us-east-1


In [3]:
# Initialize the S3 client
s3 = boto3.client('s3')

In [4]:
# Specify the file to upload and the S3 bucket and key
file_path = 'cleaned_extracted_complete_text.parquet'
bucket_name = 'pcaob-data'
s3_key = f"{prefix}/cleaned_extracted_complete_text.parquet"

In [None]:
# Upload the file
try:
    s3.upload_file(file_path, bucket_name, s3_key)
    print(f"File {file_path} uploaded to {bucket_name}/{s3_key}")
except Exception as e:
    print(f"Error uploading file: {e}")

In [5]:
# Load the Parquet file
df_spark = pd.read_parquet("extracted_complete_text.parquet")
df_spark

Unnamed: 0,pdf_link,extracted_text,PDF Link,Country,Inspection Year,Total Issuer Audit Clients,Inspection Report Date,Audits Reviewed,Part I.A Deficiency Rate,Company,Inspection Report Company
0,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nDeloitte & Touche S.A.S.\n(He...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Colombia,2023,1.0,"Apr. 26, 2024",1.0,0%,Deloitte Touche Tohmatsu Limited,Deloitte & Touche S.A.S.
1,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nErnst & Young\n(Headquartered...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Taiwan,2023,1.0,"Apr. 26, 2024",3.0,0%,Ernst & Young Global Limited,Ernst & Young
2,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nErnst & Young S.A.\n(Headquar...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Luxembourg,2023,3.0,"Apr. 26, 2024",3.0,67%,Ernst & Young Global Limited,Ernst & Young S.A.
3,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nDeloitte & Touche LLP\n(Headq...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Singapore,2023,3.0,"Mar. 21, 2024",3.0,0%,Deloitte Touche Tohmatsu Limited,Deloitte & Touche LLP
4,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nErnst & Young Audit\nS.A.S.\n...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Colombia,2023,2.0,"Mar. 21, 2024",3.0,33%,Ernst & Young Global Limited,Ernst & Young Audit S.A.S.
...,...,...,...,...,...,...,...,...,...,...,...
422,https://assets.pcaobus.org/pcaob-dev/docs/defa...,"1666 K Street, N.W.\nWashington, DC 20006\nTel...",https://assets.pcaobus.org/pcaob-dev/docs/defa...,United States,2004,,"Sep. 29, 2005",,,KPMG International Cooperative,KPMG LLP
423,https://assets.pcaobus.org/pcaob-dev/docs/defa...,"1666 K Street, N.W.\nWashington, DC 20006\nTel...",https://assets.pcaobus.org/pcaob-dev/docs/defa...,United States,2003,,"Aug. 26, 2004",,,Deloitte Touche Tohmatsu Limited,Deloitte & Touche LLP
424,https://assets.pcaobus.org/pcaob-dev/docs/defa...,"1666 K Street, N.W.\nWashington, DC 20006\nTel...",https://assets.pcaobus.org/pcaob-dev/docs/defa...,United States,2003,,"Aug. 26, 2004",,,Ernst & Young Global Limited,Ernst & Young LLP
425,https://assets.pcaobus.org/pcaob-dev/docs/defa...,"1666 K Street, N.W.\nWashington, DC 20006\nTel...",https://assets.pcaobus.org/pcaob-dev/docs/defa...,United States,2003,,"Aug. 26, 2004",,,KPMG International Cooperative,KPMG LLP


In [6]:
# Define a function for text cleaning
def clean_text(text):
    text = text.lower()
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'\W+', ' ', text)
    return text

In [7]:
# Apply the text cleaning function
df_spark['cleaned_text'] = df_spark['extracted_text'].apply(clean_text)

In [8]:
df_spark.head()

Unnamed: 0,pdf_link,extracted_text,PDF Link,Country,Inspection Year,Total Issuer Audit Clients,Inspection Report Date,Audits Reviewed,Part I.A Deficiency Rate,Company,Inspection Report Company,cleaned_text
0,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nDeloitte & Touche S.A.S.\n(He...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Colombia,2023,1.0,"Apr. 26, 2024",1.0,0%,Deloitte Touche Tohmatsu Limited,Deloitte & Touche S.A.S.,2023 inspection deloitte touche s a s headquar...
1,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nErnst & Young\n(Headquartered...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Taiwan,2023,1.0,"Apr. 26, 2024",3.0,0%,Ernst & Young Global Limited,Ernst & Young,2023 inspection ernst young headquartered in t...
2,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nErnst & Young S.A.\n(Headquar...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Luxembourg,2023,3.0,"Apr. 26, 2024",3.0,67%,Ernst & Young Global Limited,Ernst & Young S.A.,2023 inspection ernst young s a headquartered ...
3,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nDeloitte & Touche LLP\n(Headq...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Singapore,2023,3.0,"Mar. 21, 2024",3.0,0%,Deloitte Touche Tohmatsu Limited,Deloitte & Touche LLP,2023 inspection deloitte touche llp headquarte...
4,https://assets.pcaobus.org/pcaob-dev/docs/defa...,2023 Inspection\nErnst & Young Audit\nS.A.S.\n...,https://assets.pcaobus.org/pcaob-dev/docs/defa...,Colombia,2023,2.0,"Mar. 21, 2024",3.0,33%,Ernst & Young Global Limited,Ernst & Young Audit S.A.S.,2023 inspection ernst young audit s a s headqu...


#### Sentimental Analysis

In [9]:
# Initialize the sentiment analysis pipeline
sentiment_analyzer = pipeline('sentiment-analysis', model='yiyanghkust/finbert-tone')
tokenizer = AutoTokenizer.from_pretrained('yiyanghkust/finbert-tone')

Hardware accelerator e.g. GPU is available in the environment, but no `device` argument is passed to the `Pipeline` object. Model will be on CPU.


In [10]:
# Define a function to split text into smaller chunks
def split_into_chunks(text, max_length=500):
    words = text.split()
    chunks = []
    current_chunk = []
    current_length = 0

    for word in words:
        word_length = len(tokenizer.tokenize(word))
        if current_length + word_length <= max_length:
            current_chunk.append(word)
            current_length += word_length
        else:
            chunks.append(' '.join(current_chunk))
            current_chunk = [word]
            current_length = word_length

    if current_chunk:
        chunks.append(' '.join(current_chunk))

    return chunks

In [11]:
# Define a function for chunk-level sentiment analysis
def analyze_chunk_level_sentiment(text):
    chunks = split_into_chunks(text)
    sentiment_scores = []
    
    for chunk in chunks:
        try:
            score = sentiment_analyzer(chunk)[0]['score']
            sentiment_scores.append(score)
        except Exception as e:
            print(f"Error analyzing chunk: {chunk}")
            print(f"Error message: {e}")

    sentiment_sum = sum(sentiment_scores)
    chunk_count = len(sentiment_scores)
    sentiment_avg = sentiment_sum / chunk_count if chunk_count > 0 else 0
    word_count = len(text.split())
    return sentiment_scores, word_count, sentiment_sum, sentiment_avg, chunk_count

In [12]:
# Ensure cleaned_text column is of type str
df_spark['cleaned_text'] = df_spark['cleaned_text'].astype(str)

In [13]:
# Apply the sentiment analysis function to each row
df_spark['sentiment_scores'], df_spark['word_count'], df_spark['sentiment_sum'], df_spark['sentiment_avg'], df_spark['chunk_count'] = zip(*df_spark['cleaned_text'].apply(analyze_chunk_level_sentiment))

In [15]:
# Aspect-Based Sentiment Analysis (Example using TextBlob for simplicity)
def aspect_based_sentiment(text, aspect_terms):
    blob = TextBlob(text)
    aspect_sentiments = {term: blob.sentiment for term in aspect_terms if term in text}
    return aspect_sentiments

In [40]:
aspect_terms = ['compliance', 'performance', 'recommendations', 'deficiency', 'audit', 'inspection']
df_spark['aspect_sentiments'] = df_spark['cleaned_text'].apply(lambda x: aspect_based_sentiment(x, aspect_terms))

#### Summarization

In [18]:
# Initialize the summarization pipeline
summarizer = pipeline('summarization', model='facebook/bart-large-cnn')
tokenizer = AutoTokenizer.from_pretrained('facebook/bart-large-cnn')

Hardware accelerator e.g. GPU is available in the environment, but no `device` argument is passed to the `Pipeline` object. Model will be on CPU.


In [19]:
def summarize_text(text):
    try:
        # Ensure text length is within the bounds for the summarizer
        text_length = len(text)
        max_input_length = 1024
        step_size = max_input_length // 2

        if text_length <= max_input_length:
            # Dynamically set max_length and min_length based on input length
            dynamic_max_length = min(200, max(50, int(text_length * 0.2)))
            dynamic_min_length = max(30, int(dynamic_max_length * 0.5))
            summary = summarizer(text, max_length=dynamic_max_length, min_length=dynamic_min_length, do_sample=False)[0]['summary_text']
            return post_process_summary(summary)
        else:
            # Summarize in chunks if the text is too long
            chunks = [text[j:j+max_input_length] for j in range(0, text_length, step_size)]
            chunk_summaries = []

            for chunk in chunks:
                dynamic_max_length = min(200, max(50, int(len(chunk) * 0.2)))
                dynamic_min_length = max(30, int(dynamic_max_length * 0.5))
                summary = summarizer(chunk, max_length=dynamic_max_length, min_length=dynamic_min_length, do_sample=False)[0]['summary_text']
                chunk_summaries.append(post_process_summary(summary))

            # Join all chunk summaries into one and post-process the combined summary
            combined_summary = " ".join(chunk_summaries)
            return post_process_summary(combined_summary)
    except Exception as e:
        print(f"Error summarizing text: {text[:100]}... Error: {str(e)}")
        return None

In [20]:
def post_process_summary(summary):
    # Split summary into sentences
    sentences = summary.split('. ')
    # Keep only the first 5-7 sentences
    processed_summary = '. '.join(sentences[:7])
    return processed_summary

In [21]:
def process_batch(texts):
    summaries = []
    for text in texts:
        summaries.append(summarize_text(text))
        time.sleep(20)  # Rest for 20 seconds between each row
    return summaries

In [22]:
# Number of rows in each batch for parallel processing
batch_size = 10

In [23]:
# Splitting dataframe into batches
batches = [df_spark['cleaned_text'][i:i + batch_size] for i in range(0, len(df_spark), batch_size)]

In [24]:
# Using ThreadPoolExecutor for parallel processing
all_summaries = []
with ThreadPoolExecutor(max_workers=4) as executor:  # Adjust max_workers as needed
    futures = [executor.submit(process_batch, batch) for batch in batches]
    for future in as_completed(futures):
        all_summaries.extend(future.result())

Your max_length is set to 200, but your input_length is only 197. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=98)
Your max_length is set to 200, but your input_length is only 181. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=90)
Your max_length is set to 200, but your input_length is only 192. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=96)
Your max_length is set to 200, but your input_length is only 183. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=91)


In [25]:
# Adding the summaries to the dataframe
df_spark['summary'] = all_summaries

In [26]:
df_spark['summary']

0      This is a public version of a pcaob inspection...
1      This is a public version of a pcaob inspection...
2       2022 inspection deloitte ag headquartered in ...
3      This is a public version of a pcaob inspection...
4       2022 inspection kpmg samjong accounting corp ...
                             ...                        
422    This is a public version of a pcaob inspection...
423    This is a public version of a pcaob inspection...
424    Pcaobus org report on 2003 limited inspection ...
425    This is a public version of a pcaob inspection...
426    This is a public version of a pcaob inspection...
Name: summary, Length: 427, dtype: object

In [44]:
# Function to get top n-grams
def get_top_ngrams(corpus, n=None, ngram_range=(7,10)):
    vec = CountVectorizer(ngram_range=ngram_range).fit(corpus)
    bag_of_words = vec.transform(corpus)
    sum_words = bag_of_words.sum(axis=0) 
    words_freq = [(word, sum_words[0, idx]) for word, idx in vec.vocabulary_.items()]
    words_freq = sorted(words_freq, key = lambda x: x[1], reverse=True)
    return words_freq[:n]

In [45]:
# Apply the function to your dataframe
df_spark['top_bi_grams'] = df_spark['cleaned_text'].apply(lambda doc: get_top_ngrams([doc], n=5))

In [49]:
# Save the final DataFrame to a single CSV file
df_spark.to_csv("final_transformed_data.csv", index=False)

In [None]:
# Specify the file to upload and the S3 bucket and key
file_path = 'final_transformed_data.csv'
bucket_name = 'pcaob-data'
s3_key = f"{prefix}/final_transformed_data.csv"

In [None]:
# Upload the file for AWS QuickSight
try:
    s3.upload_file(file_path, bucket_name, s3_key)
    print(f"File {file_path} uploaded to {bucket_name}/{s3_key}")
except Exception as e:
    print(f"Error uploading file: {e}")