In [1]:
import os, re
from typing import List, Dict
from datetime import datetime
from collections import defaultdict, Counter

from pypdf import PdfReader
import matplotlib.pyplot as plt
import ollama
import spacy
import dill as pickle
import pymupdf
import numpy as np
from wordcloud import WordCloud
from docx import Document
from docx.shared import Inches

In [2]:
# Set model for ollama
#MODEL = "mistral" 
#MODEL = "llama3.1"
#MODEL = "qwen:14b"
MODEL = "phi4"

SPACY_MODEL = "en_core_web_sm"

# Set folder paths
PDF_FOLDER = "Codings"
OUTPUT_FOLDER = os.path.join("Processed", MODEL)
PROCESSED_DOC_FILENAME = f"{datetime.now().strftime("%y%m%d")}-{MODEL}-processed_documents.pkl"

In [3]:
class OllamaHandler:
    def __init__(self, model_name: str = MODEL):
        self.model = model_name
        
    def generate_short_summary(self, text: str) -> str:
        prompt = f"Summarize the following text in exactly one (!) sentence withouth any further comments. Start your answer with 'The article is about...'. Article: {text}"
        response = ollama.generate(model=self.model, prompt=prompt)
        original_response = response["response"]
        
        return original_response.split(".")[0] + "."
    
    def generate_summary(self, text: str) -> str:
        prompt = f"Your are a research assistant and have been asked to summarize the following text in exactly 4 bullet points. It is extremely important that it is only four bullet points. Avoid adding any further comments or information that appears not in the text. Avoid mentioning that you are a research assistant and what your task is in your response. Here is the text you need to summarize: {text}"
        response = ollama.generate(model=self.model, prompt=prompt)
        return response["response"]
    
    def answer_question(self, text: str, question: str) -> str:
        prompt = f"Answer the following question based on the text, be concise and only mention topics that occured in the text: {text} Question: {question}"
        response = ollama.generate(model=self.model, prompt=prompt)
        return response["response"]
    
    def analyze_sentiment(self, text: str) -> float:
        prompt = f"Analyze the sentiment of the following text and respond with a single number between -5 (very negative towards AI, mentioned mainly risks) and +5 (very positive towards AI, mentioned mainly opportunities) without any comments: {text}"
        response = ollama.generate(model=self.model, prompt=prompt, system="You")
        try:
            sentiment = float(re.search(r"-?\d+\.?\d*", response["response"]).group())
            return max(min(sentiment, 5), -5)
        except:
            return 0.0
        
    def extract_entities(self, text: str) -> list:
        prompt = f"Extract the entities (Actors/spokespeople/institutions) from the following text. Separate them by a semicolon ; and only answer with the entities without any other comments: {text}"
        response = ollama.generate(model=self.model, prompt=prompt)
        
        response = response["response"].replace("\n", "")
                   
        return response.split(";")

In [4]:
class PdfDocument:
    def __init__(self, path: str, content: str, title: str):
        self.path = path
        self.content = content
        self.title = title
        self.filename = self.path.split("/")[-1].split(".")[0]
        self.short_summary = ""
        self.summary = ""
        self.sentiment = 0.0
        self.entities = []
        self.highlighted_sentences = defaultdict(list)     # Initialize dictionary to store sentences by highlight color
        self.wordcloud_data = [] # List to store the information for the wordclouds
        self.questions = []
        
        print(f"{datetime.now().strftime("%H:%M:%S")} Initialized PdfDocument: <{self.title}>")
        
    def to_dict(self) -> Dict:
        return {
            "path": self.path,
            "content": self.content,
            "title": self.title,
            "short_summary": self.short_summary,
            "summary": self.summary,
            "sentiment": self.sentiment,
            "entities": self.entities,
            "highlighted_sentences": self.highlighted_sentences,
            "questions": self.questions
        }
    
    @classmethod
    def from_dict(cls, data: Dict):
        doc = cls(data["path"], data["content"], data["title"])
        doc.short_summary = data["short_summary"]
        doc.summary = data["summary"]
        doc.sentiment = data["sentiment"]
        doc.entities = data["entities"]
        doc.highlighted_sentences = data["highlighted_sentences"]
        doc.questions = data["questions"]
        return doc
    
    def extract_highlighted_sentences(self) -> dict:
        
        highlighted_sentences = defaultdict(list)   

        # Open PDF document
        doc = pymupdf.open(self.path)
        
        for page_num in range(len(doc)):
            page = doc[page_num]
            
            # Get plain text content of the page
            text = page.get_text()
            
            # Split text into sentences (basic splitting by period)
            sentences = [s.strip() + '.' for s in text.split('.') if s.strip()]
            
            # Get highlights on the page
            highlights = page.get_text_words()
            annots = page.annots()
            
            if annots:
                for annot in annots:
                    if annot.type[0] == 8:  # Highlight annotation
                        # Get highlight coordinates
                        coords = annot.rect
                        
                        # Get color of highlight (normalize to RGB)
                        color = annot.colors['stroke']
                        if color:
                            color_rgb = tuple(int(c * 255) for c in color)
                        else:
                            continue
                        
                        # Find words within highlight coordinates
                        highlighted_words = []
                        for word_info in highlights:
                            word_rect = pymupdf.Rect(word_info[:4])
                            if coords.intersects(word_rect):
                                highlighted_words.append(word_info[4])
                        
                        if highlighted_words:
                            # Find the sentence containing the highlighted words
                            for sentence in sentences:
                                if any(word.lower() in sentence.lower() for word in highlighted_words):
                                    # Convert RGB tuple to hex for consistent key format
                                    color_hex = '#{:02x}{:02x}{:02x}'.format(*color_rgb)
                                    if sentence not in highlighted_sentences[color_hex]:
                                        highlighted_sentences[color_hex].append(sentence)
        
        # Convert defaultdict to regular dict before returning it to the class
        self.highlighted_sentences = dict(highlighted_sentences)

    def get_pretty_highlights(self):
        text = ""
        
        for color, sentences in self.highlighted_sentences.items():
            print(f"\nHighlight Color: {color}")
            print("-" * 50)
            
            text += "".join([f"{i}. {sentence}\n" for i, sentence in enumerate(sentences, 1)])
            
        return text
        
        
    def pretty_print_highlights(self):
        print(self.get_pretty_highlights())                
        
                
    def get_number_of_highlights(self) -> dict:
        """Function to get the number of highlights in the document"""
        colors_count = dict()

        for color, sentences in self.highlighted_sentences.items():
            colors_count[color] = len(sentences)
            
        return colors_count
    
    def save2docx(self, file_path: str):
        """Exports document data, including word cloud, to a Word document."""
        document = Document()

        # Add content to the Word document
        document.add_heading(f"Media Analysis - {self.title}", level=1)

        document.add_heading("Short Summary", level=2)
        document.add_paragraph(self.short_summary)

        document.add_heading("Summary", level=2)
        document.add_paragraph(self.summary)
        
        for id, question in enumerate(self.questions):
            document.add_heading(f"Question {id+1}", level=2)
            document.add_paragraph(question)

        document.add_heading("Sentiment", level=2)
        document.add_paragraph(f"The sentiment is {self.sentiment}")

        document.add_heading("Entities", level=2)
        document.add_paragraph(", ".join(self.entities))

        document.add_heading("Highlights", level=2)
        document.add_paragraph(self.get_number_of_highlights())

        # Add word clouds for each highlight color
        try:
            for color, _ in self.highlighted_sentences.items():
                color_name = color.replace('#', '')
                wordcloud_path = os.path.join(OUTPUT_FOLDER, self.title, f"wordcloud_{color_name}.png")
                
                # Check if the word cloud image exists
                if os.path.exists(wordcloud_path):  
                    document.add_heading(f"Wordcloud for {color} Highlights", level=2)
                    document.add_picture(wordcloud_path, width=Inches(4.0))
                    document.add_paragraph("Top 10 words:")
                    
                    # Add top words and their frequencies
                    for word, freq in self.wordcloud_data.get(color, []):
                        document.add_paragraph(f"- {word}: {freq}")
        except AttributeError:
            pass

        # Save the document
        document.save(file_path)
                
    def format2markdown(self) -> str:
        """Formats document data into a markdown string."""

        # Handle potential errors gracefully
        try:
            title = self.title
        except AttributeError:
            title = "No Title"
            
        try:
            short_summary = self.short_summary
        except AttributeError:
            short_summary = "No Short Summary"
            
        try:
            summary = self.summary
        except AttributeError:
            summary = "No Summary"
        
        try:
            questions = [f"Question {id+1}: {answer}" for id, answer in enumerate(self.questions)]
        except AttributeError:
            questions = "No Questions"
            
        try:
            sentiment = self.sentiment
        except AttributeError:
            sentiment = "No Sentiment"
            
        try:
            entities = self.entities
            entities_str = str(entities)
        except AttributeError:
            entities_str = "No Entities"
            
        try:
            highlights = self.pretty_print_highlights()
        except AttributeError:
            highlights = "No Highligts"

        # Construct the markdown string
        markdown_output = f"# Media Analysis - {title}\n\n"
        markdown_output += f"# Short Summary\n{short_summary}\n\n"
        markdown_output += f"# Summary\n{summary}\n\n"
        markdown_output += f"# Questions\n{questions}\n\n"
        markdown_output += f"# Sentiment\n{sentiment}\n\n"
        markdown_output += f"# Entities\n{entities_str}\n\n"
        markdown_output += f"# Highlights\n{highlights}\n\n"
            

        # I deleted the word cloud part because it cannot be shown in markdown

        return markdown_output

In [5]:
class PdfAnalyzer:
    def __init__(self, entity_collection = "all", ollama_model: str = MODEL):
        self.ollama_handler = OllamaHandler(ollama_model)
        self.nlp = spacy.load(SPACY_MODEL) # Load spacy model
        self.entitiy_collection = entity_collection if entity_collection in ["all", "ollama", "spacy"] else "all"
        self.pdf_documents = []
        
    def extract_text_from_pdf(self, pdf_path: str) -> str:
        """Iterates over all pages in the document and stores the text in instance."""

        reader = PdfReader(pdf_path)
        text = ""
        num_pages = reader.pages
        
        for page_count, page in enumerate(num_pages):
            text_current_page = page.extract_text()
            print(f"{datetime.now().strftime("%H:%M:%S")}\t Adding page {page_count}/{len(num_pages)} with {len(text_current_page)} characters")
            text += text_current_page
        return text
    
    def extract_entities(self, text: str) -> List[str]:
        """Extracts entities from text using spacy PERSON and ORG labels."""
        
        doc = self.nlp(text)
        entities = [ent.text for ent in doc.ents if ent.label_ in ["PERSON", "ORG"]]
        print(f"{datetime.now().strftime("%H:%M:%S")}\t Found {len(entities)} in text")
        return list(set(entities))
    
    def process_pdf(self, pdf_path: str) -> PdfDocument:
        """Main function that processes a single PDF document with it's subfunctions. Prints status updates."""
        
        content = self.extract_text_from_pdf(pdf_path)
        content = self.clean_text(content, line_breaks=False)

        title = os.path.splitext(os.path.basename(pdf_path))[0]
        
        # Initialize PdfDocument object
        print(f"{datetime.now().strftime("%H:%M:%S")}\t Create PDF document <{title[:20]}...> with content of length {len(content)}")
        pdf_doc = PdfDocument(pdf_path, content, title)
        
        print(f"{datetime.now().strftime("%H:%M:%S")}\t Generating summary")
        short_summary_response = self.ollama_handler.generate_short_summary(content)
        pdf_doc.short_summary = self.clean_text(short_summary_response)
        
        summary_response = self.ollama_handler.generate_summary(content)
        pdf_doc.summary = self.clean_text(summary_response, soft_clean=True)

        questions =  ["How do the media (in our case = the sample we are analyzing) frame the public discussion about a given issue (in our case = ChatGPT)? Are there certain **metaphors** that keep cropping up?",
                      "What **perspectives and aspects** of the topic are being widely covered, what aspects are being ignored?",
                      "Which role does the Arabic World play in this article? How do they leverage AI? Answer with 'Not mentioned' if not applicable.",
                      "What is the final message of the article? Keep short!"]

        print(f"{datetime.now().strftime("%H:%M:%S")}\t Finding answer to {len(questions)} question{"s" if len(questions) > 1 else ""}")
        for question in questions:
            answer = self.ollama_handler.answer_question(content, question)
            pdf_doc.questions.append(self.clean_text(answer, soft_clean=True))
                    
        print(f"{datetime.now().strftime("%H:%M:%S")}\t Analyzing sentiment")
        pdf_doc.sentiment = self.ollama_handler.analyze_sentiment(content)
        
        print(f"{datetime.now().strftime("%H:%M:%S")}\t Extracting entities from text")
        
        if self.entitiy_collection in ["all", "spacy"]:
            # Get entities with spacy
            entities_response = self.extract_entities(content)
            pdf_doc.entities = [self.clean_text(ent) for ent in entities_response]
            
        if self.entitiy_collection in ["all", "ollama"]:
            # Get entities with ollama
            entities_response = self.ollama_handler.extract_entities(content)
            pdf_doc.entities.extend([self.clean_text(ent) for ent in entities_response])
        
        print(f"{datetime.now().strftime("%H:%M:%S")}\t Extracting text-highlights")
        pdf_doc.extract_highlighted_sentences()
        
        return pdf_doc
    
    def process_folder(self, PDF_FOLDER: str) -> List[PdfDocument]:
        """Iterates over all PDF files in the folder and processes them."""
        pdf_documents = []
        for filename in os.listdir(PDF_FOLDER):
            if filename.endswith(".pdf"):
                pdf_path = os.path.join(PDF_FOLDER, filename)
                pdf_doc = self.process_pdf(pdf_path)
                pdf_documents.append(pdf_doc)
        return pdf_documents
    
    def clean_text(self, text: str, soft_clean = False, line_breaks: bool = True) -> str:
        
        # replace ’ with '
        #text = text.replace("’", "'")
        
        # remove line breaks
        text = text.replace("\n", " ") if line_breaks and not soft_clean else text
        
        # remove non-ascii characters
        text = text.encode("ascii", "ignore").decode() if not soft_clean else text
        
        # remove all special characters except "-"       
        text = re.sub(r"[^a-zA-Z0-9.,* -]", " ", text) if not soft_clean else text
        
        # remove all double spaces
        text = re.sub(r"  ", " ", text)
        
        # remove leading and trailing whitespaces
        text = text.strip()
 
        
        return text
    
    def save_documents(self, documents: List[PdfDocument], OUTPUT_FOLDER: str):
        """Saves the processed documents to a pickle file."""
        
        path = os.path.join(OUTPUT_FOLDER, PROCESSED_DOC_FILENAME)
        
        if not os.path.exists(OUTPUT_FOLDER):
            os.makedirs(OUTPUT_FOLDER)
        
        with open(path, "wb") as f:
            pickle.dump([doc.to_dict() for doc in documents], f)
    
    def load_documents(self, input_path: str) -> List[PdfDocument]:
        """Loads the processed documents from a pickle file"""
        with open(input_path, "rb") as f:
            data = pickle.load(f)
        return [PdfDocument.from_dict(doc_dict) for doc_dict in data]

In [6]:
def generate_word_cloud(text_list, output_path = None, color_name="highlight", width=800, height=400, background_color='white'):
    # Load spacy model
    nlp = spacy.load(SPACY_MODEL)
    
    # Combine all strings into one text
    text = " ".join(text_list)
    text.replace("\n", " ")
    
    # Process the text with spacy
    doc = nlp(text)
    
    # Get non-stop words
    words = [token.text for token in doc if not token.is_stop and token.is_alpha]
    processed_text = " ".join(words)
    
    # Create and generate a word cloud image
    wordcloud = WordCloud(
        width=width,
        height=height,
        background_color=background_color,
        min_font_size=10,
        max_font_size=150,
        random_state=42
    ).generate(processed_text)
    
    # Display the word cloud
    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation="bilinear")
    plt.axis("off")
    plt.tight_layout(pad=0)
    
    if output_path:
        plt.savefig(f"{output_path}/wordcloud_{color_name}.png", bbox_inches="tight", pad_inches=0)
        plt.close()
    else:
        plt.show()
    
    # Return most common words and their frequencies
    word_freq = Counter(words).most_common(10)
    return word_freq


def process_highlights_with_wordcloud(doc: PdfDocument, output_folder) -> dict:
    """ Iterates over each highlight color in the document and generates a word cloud for each color."""
    wordcloud_data = {}
        
    for highlight_color, sentences in doc.highlighted_sentences.items():
        color_name = highlight_color.replace("#", "")
        path = os.path.join(output_folder, doc.title)
        
        if not os.path.exists(path):
            print(f"Creating folder: {path}")
            os.makedirs(path)
            
        word_frequencies = generate_word_cloud(
            sentences,
            output_path=os.path.join(output_folder, doc.title),
            color_name=color_name
        )
        wordcloud_data[highlight_color] = word_frequencies
    return wordcloud_data

In [7]:
# Initialize the PdfAnalyzer class
analyzer = PdfAnalyzer(entity_collection = "spacy")

In [None]:
if False:
    # Process the documents in the folder where the PDFs are
    documents = analyzer.process_folder(PDF_FOLDER)

    # Save documents to the output folder
    analyzer.save_documents(documents, OUTPUT_FOLDER)

In [9]:
# Load already analyzed documents
loaded_documents = analyzer.load_documents(os.path.join(OUTPUT_FOLDER, PROCESSED_DOC_FILENAME))

01:14:32 Initialized PdfDocument: <Will ChatGPT and AI have an impact on Saudi workforce productivity_ _ Arab News>
01:14:32 Initialized PdfDocument: <AI is not smarter than humans _ Updated 08 April 2023>
01:14:32 Initialized PdfDocument: <ChatGPT_ AI grows more powerful as we become more predictable _ Arab News>
01:14:32 Initialized PdfDocument: <ChatGPT outperforms copywriters in STEP Conference’s outdoor adverts _ Updated 22 February 2023>
01:14:32 Initialized PdfDocument: <Is the Arab world ready for the uncertain age of AI-powered web tools_Updated 09 March 2023>
01:14:32 Initialized PdfDocument: <‘I am not here to take your job,’ ChatGPT tells Frankly Speaking host _Updated 20 March 2023>
01:14:32 Initialized PdfDocument: <ChatGPT is the ‘Netscape moment’ for artificial intelligence’ _ Arab News>
01:14:32 Initialized PdfDocument: <No need to demonize ChatGPT but AI regulation is a must _ Arab News>


In [10]:
# Print a summary of every loaded file (Optional: with highlights)
for doc in loaded_documents:
    print(f"Title: {doc.title}")
    print(f"Short Summary: {doc.short_summary}")
    print(f"Summary: {doc.summary}")
    #print(f"Questions: {"\n".join(doc.questions)}")
    print(f"Sentiment: {doc.sentiment}")
    print(f"Entities: {doc.entities}")
    
    print(f"Highlights: {doc.get_number_of_highlights()}")
    
    if not doc.highlighted_sentences:
        doc.extract_highlighted_sentences()
        print(doc.pretty_print_highlights())
    
    print("-" * 50)

Title: Will ChatGPT and AI have an impact on Saudi workforce productivity_ _ Arab News
Short Summary: The article is about how artificial intelligence technologies like ChatGPT could enhance Saudi Arabia s workforce productivity by boosting economic development, alleviating job loss fears through skill enhancement, increasing efficiency, and impacting various sectors such as healthcare, transportation, energy, finance, and retail, while requiring a strategic approach to integrate AI into operations effectively.
Summary: - AI technologies like ChatGPT can enhance Saudi Arabia's workforce productivity by automating mundane tasks and enabling strategic focus. This fosters economic development through increased efficiency and innovation.
 
- Concerns exist about job displacement due to AI; however, 50% of employees believe it boosts productivity while 51% see potential for better positions. Embracing AI involves cultivating a strong culture of learning and adaptation.

- Raymond Khoury hig

In [11]:
for doc in loaded_documents:
    doc.wordcloud_data = process_highlights_with_wordcloud(doc, OUTPUT_FOLDER)
    filename = f"cai_media_analysis_{doc.filename}.docx"
    doc.save2docx(os.path.join(OUTPUT_FOLDER, filename))

# Write a markdown file for every document
for doc in loaded_documents:
    doc.wordcloud_data = process_highlights_with_wordcloud(doc, OUTPUT_FOLDER)
    markdown_content = doc.format2markdown()
    filename = f"cai_media_analysis_{doc.filename}.md"
    with open(os.path.join(OUTPUT_FOLDER, filename), "w") as f:
        f.write(markdown_content)