In [27]:
import arxiv
from datetime import datetime
import os
import requests
from PyPDF2 import PdfReader
from io import BytesIO
import time
from datetime import datetime, timezone
import pandas as pd
import logging

# Function to sanitize a filename by removing invalid characters and replacing spaces with underscores.

# link : http://lukasschwab.me/arxiv.py/index.html



def sanitize_filename(filename):
    """
    Sanitize a filename by removing invalid characters and replacing spaces with underscores.

    :param filename: str, original filename
    :return: str, sanitized filename
    """
    # Characters that are not allowed in filenames
    invalid_chars = set(r'\/:*?"<>|')

    # Create a new string without invalid characters
    sanitized_filename = "".join(c for c in filename if c not in invalid_chars)

    # Replace spaces with underscores for readability and to avoid issues with command line operations
    sanitized_filename = sanitized_filename.replace(" ", "_")

    # Return sanitized filename
    return sanitized_filename


def create_directory(directory):
    """
    Create a directory if it does not exist.

    :param directory: str, directory path
    :return: None
    """
    # Use 'exist_ok=True' to make this operation idempotent i.e., running it multiple times doesn't have different effects
    os.makedirs(directory, exist_ok=True)


# Function to download a PDF from a given URL and extract text from it.
def download_article_pdf(result, url):
    """
    Download a PDF from a given URL, check the updated date and extract text from it.

    :param result: arxiv.Result, object containing information about an article
    :param url: str, url to download the PDF from
    :return: tuple(str, str), full_text and brief_text of the PDF along with its metadata
    """
    updated_date = datetime.strptime(
        str(result.updated), "%Y-%m-%d %H:%M:%S%z")

    # compare it with a specific date
    # adjust this to the specific date you want
    specific_date = datetime(2023, 1, 1, tzinfo=timezone.utc)

    if updated_date <= specific_date:
        # if the updated_date is on or before the specific date, don't continue the function
        return None

    response = requests.get(url)
    # print("response: ", response)
    # Initialize a PDF reader object with the content of the response
    pdf = PdfReader(BytesIO(response.content))

    # Build a string with metadata and content of the PDF document.
    # Multiline string literals are used for clarity and conciseness.
    full_text = f"""Title:\t{result.title}
Summary:
{result.summary}

PDF URL: \t{result.pdf_url}
Authors: \t{result.authors}

################################################################################################
Published: \t{result.published}
Updated: \t{result.updated}
Entry ID: \t{result.entry_id}
Short ID: \t{result.get_short_id()}

###############################PDF Content Will Start From Here:###############################
"""
    brief_text = f"""Title:\t{result.title}
Summary:
{result.summary}

PDF URL:\t{result.pdf_url}
Authors:\t{result.authors}
"""
    # Append the text of each page of the PDF to the full_text string
    for page in pdf.pages:
        full_text += page.extract_text()

    # Return the full text of the PDF along with its metadata
    return full_text, brief_text


# Function to sanitize the article text by removing the 'References' section
def sanitize_article_text(text):
    """
    Sanitize the article text by removing the 'References' section.

    :param text: str, original article text
    :return: str, sanitized article text
    """
    # Find the start of the 'References' section, if it exists
    references_index = text.upper().find("REFERENCES")

    # If a 'References' section exists, remove everything from that point onwards
    if references_index != -1:
        text = text[:references_index]

    # Return the sanitized article text
    return text


def save_article(save_path, text):
    """
    Save the given text into a file at the specified path.

    :param save_path: str, file path to save the article
    :param text: str, text to be saved
    :return: None
    """
    # Open the file in write mode. If the file already exists, it will be overwritten.
    # The 'encoding' argument is used to specify the encoding of the file.
    # The 'errors' argument tells Python how to handle encoding errors.
    with open(save_path, "w", encoding="utf-8", errors="ignore") as f:
        # Write the text into the file
        f.write(text)


# Main function that searches for articles based on a keyword, downloads the PDFs,
# extracts and sanitizes text from the PDFs, and saves the text and PDFs to specified directories

def create_directory(directory):
    """
    Create a directory if it does not exist.

    :param directory: str, directory path
    :return: None
    """
    if not os.path.exists(directory):
        os.makedirs(directory)


def get_saved_filenames(directory):
    """
    Get the filenames of the files already saved in the directory.

    :param directory: str, directory path
    :return: set, set of saved file names
    """
    return set(os.listdir(directory))


def setup_directories(txt_dir, pdf_dir, brief_dir):
    """
    Create directories for saving text, PDFs and briefs if they do not exist.

    :param txt_dir: str, directory path for text files
    :param pdf_dir: str, directory path for pdf files
    :param brief_dir: str, directory path for brief files
    :return: None
    """
    create_directory(txt_dir)
    create_directory(pdf_dir)
    create_directory(brief_dir)


def perform_search(keyword, n):
    """
    Perform a search on arXiv based on the provided keyword.
    Retrieve a maximum of n results sorted by the submission date in descending order.

    :param keyword: str, keyword for search
    :param n: int, number of maximum results
    :return: arxiv.Search, object containing the search results
    """
    return arxiv.Search(
        query=keyword,
        max_results=n,
        sort_by=arxiv.SortCriterion.SubmittedDate,
        sort_order=arxiv.SortOrder.Descending
    )


def print_if_saved(result, filenames_dict):
    """
    Check if the files are already saved in the directories.
    If so, print a message indicating that the article is already saved.

    :param result: arxiv.Result, object containing information about an article
    :param filenames_dict: dict, dictionary containing filenames and saved filenames
    :return: bool, True if the files are already saved, False otherwise
    """
    filename_text = filenames_dict['text']
    filename_pdf = filenames_dict['pdf']
    filename_brief = filenames_dict['brief']

    saved_filenames_txt = filenames_dict['saved_txt']
    saved_filenames_pdf = filenames_dict['saved_pdf']
    saved_filenames_brief = filenames_dict['saved_brief']

    if filename_text in saved_filenames_txt or filename_pdf in saved_filenames_pdf or filename_brief in saved_filenames_brief:
        print(
            f"- Already Saved:{result.title} txt.") if filename_text in saved_filenames_txt else None
        print(
            f"- Already Saved:{result.title} pdf.") if filename_pdf in saved_filenames_pdf else None
        print(
            f"- Already Saved:{result.title} brief.") if filename_brief in saved_filenames_brief else None
        return True
    return False





def convert_article_to_dict(result, brief_text, full_text):
    """Convert article information to a dictionary.

    Args:
        result: A single article information retrieved from search.
        brief_text (str): Brief text information about the article.
        full_text (str): Full text information about the article.

    Returns:
        dict: A dictionary containing key-value pairs of article information.
    """
    return {
        "Article_ID": str(result.get_short_id()),
        "Title": result.title, "Summary": str(result.summary),
        "PDF_URL": result.pdf_url,
        "Authors": ", ".join(str(author) for author in result.authors),
        "Published": result.published,
        "Updated": result.updated,
        "Brief_Text": str(brief_text),
        "Full_Text": str(full_text),
        
    }



def save_articles_to_csv(all_data):
    """Save articles information to a CSV file.

    Args:
        all_data: A list of dictionaries, where each dictionary contains the info of one article.
    """
    # Convert the list of data into a DataFrame
    df_new = pd.DataFrame(all_data)
    
    # Initialize df_old as an empty DataFrame
    df_old = pd.DataFrame()

    # If the "result.csv" file exists, read its contents into df_old DataFrame
    if os.path.exists("result.csv"):
        df_old = pd.read_csv("result.csv")

    # Concatenate the new dataframe (df_new) with the old dataframe (df_old), with new data on top
    df_combined = pd.concat([df_new, df_old], ignore_index=True)

    # Save the combined dataframe to CSV file
    df_combined.to_csv("result.csv", index=False, quoting=1)






def sleep_time(i): # to avoid getting blocked IP by arXiv
        if i == 0: # first time
            print("We are about to start to retrieve articles from arXiv.")
            return True
        else: # after first time
            print("Sleeping for 5 seconds...to avoid getting blocked IP by arXiv.")
            time.sleep(5)
            print("Awake!")

def main(keyword, maximum_number_articles_retrieve, save_directory_txt, save_directory_pdf, save_directory_brief):
    """
    Main function to perform search, download articles, and save them.

    :param keyword: str, keyword for search
    :param n: int, number of maximum results
    :param save_directory_txt: str, directory path for text files
    :param save_directory_pdf: str, directory path for pdf files
    :param save_directory_brief: str, directory path for brief files
    :return: None
    """
    # Create directories for saving files if they do not exist
    setup_directories(save_directory_txt, save_directory_pdf,
                      save_directory_brief)

    # Get the filenames of the files already saved in the directories
    saved_filenames_txt = get_saved_filenames(save_directory_txt)
    saved_filenames_pdf = get_saved_filenames(save_directory_pdf)
    saved_filenames_brief = get_saved_filenames(save_directory_brief)

    # Perform a search on arXiv based on the provided keyword
    search = perform_search(keyword, maximum_number_articles_retrieve)


    # For each article in the search results
    for i, result in enumerate(search.results()):
        # Initialize DataFrame with the necessary columns
        # Sanitize the article's title to use it as a valid filename
        filename = sanitize_filename(result.title)

        # Get the updated date of the article and format it as "YYYY_MM_DD"
        datetime_obj = datetime.strptime(
            str(result.updated), "%Y-%m-%d %H:%M:%S%z").strftime("%Y_%m_%d")

        # Construct filenames for text, PDF, and brief formats
        filename_text = datetime_obj + "_" + filename + ".txt"
        filename_pdf = datetime_obj + "_" + filename + ".pdf"
        filename_brief = datetime_obj + "_" + filename + ".brief.txt"

        # Construct a dictionary containing filenames and saved filenames
        filenames_dict = {
            'text': filename_text,
            'pdf': filename_pdf,
            'brief': filename_brief,
            'saved_txt': saved_filenames_txt,
            'saved_pdf': saved_filenames_pdf,
            'saved_brief': saved_filenames_brief
        }

        # If the files are already saved, print a message and move to the next article
        if print_if_saved(result, filenames_dict) == True: #  If the files are not already saved
            # add one to i 
            i += 1
            continue

        sleep_time(i)

        # Download the PDF content of the article and extract the text
        full_text, brief_text = download_article_pdf(result, result.pdf_url)

        # Sanitize the article text by removing the 'References' section if present
        text = sanitize_article_text(full_text)

        # Define the path where the article text will be saved
        save_path = os.path.join(save_directory_txt, filename_text)

        # Save the article text to a file
        save_article(save_path, text)

        # Download the PDF version of the paper
        paper = next(arxiv.Search(id_list=[result.get_short_id()]).results())
        paper.download_pdf(dirpath=str(save_directory_pdf),
                        filename=filename_pdf)

        # Print a message to indicate that the article was saved successfully
        print(f"{result.title}. Link{result.pdf_url}. {datetime_obj} ")

        # Save brief_text to a text file
        save_path = os.path.join(save_directory_brief,
                                "brief_" + filename_text)
        save_article(save_path, brief_text)
        # Download the PDF content of the article and extract the text
        full_text, brief_text = download_article_pdf(result, result.pdf_url)
        # Convert the article to a dictionary and append to the list
        all_data.append(convert_article_to_dict(result, brief_text, full_text))

all_data = []

if __name__ == "__main__":
    """
    Main function to get the keyword for the article search and define the directories where the articles will be saved.
    """
    # Define the keyword for the article search
    # Ask user to input the keyword for the article search if not provided "large AND language AND models"
    # If no keyword is provided, prompt the user for a keyword
    # keyword = "large AND language AND models"
    keyword = None
    if keyword is None:
        keyword = input(
            "Enter the keyword for the article search: ") or "large AND language AND models"

    # Define the maximum number of articles to retrieve, API's limit is 300,000
    # ask user to input the maximum number of articles to retrieve if not provided

    maximum_number_articles_retrieve = None
    if maximum_number_articles_retrieve is None:
        try:
            maximum_number_articles_retrieve = int(input("Enter the maximum number of articles to retrieve (default is 4): ")) or 4
        except ValueError:
            maximum_number_articles_retrieve = 4

        
    # Define the directories where the articles will be saved
    save_directory_pdf, save_directory_txt, save_directory_brief = (
        "save_directory_pdf", "save_directory_txt", "save_directory_brief")

    # Call the main function to perform the article search and save the articles
    main(keyword, maximum_number_articles_retrieve, save_directory_txt,
         save_directory_pdf, save_directory_brief)
    # Call the function to save data to CSV
    save_articles_to_csv(all_data)



- Already Saved:In-context Autoencoder for Context Compression in a Large Language Model txt.
- Already Saved:In-context Autoencoder for Context Compression in a Large Language Model pdf.
- Already Saved:InternVid: A Large-scale Video-Text Dataset for Multimodal Understanding and Generation txt.
- Already Saved:InternVid: A Large-scale Video-Text Dataset for Multimodal Understanding and Generation pdf.
- Already Saved:mBLIP: Efficient Bootstrapping of Multilingual Vision-LLMs txt.
- Already Saved:mBLIP: Efficient Bootstrapping of Multilingual Vision-LLMs pdf.
- Already Saved:LLM-assisted Knowledge Graph Engineering: Experiments with ChatGPT txt.
- Already Saved:LLM-assisted Knowledge Graph Engineering: Experiments with ChatGPT pdf.


'd:\\Linkedin\\Post + latest Arxiv LLM papers'

In [8]:
import logging
import time

# Set up logging
logging.basicConfig(filename='./logging_app/lapp.log', filemode='w', 
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    level=logging.INFO)  # Set level to INFO in the same basicConfig call
# Creating an object 
logger=logging.getLogger() 

# Setting the threshold of logger to DEBUG 
logger.setLevel(logging.DEBUG) 

# Log messages
logger.debug('This is a debug message')
time.sleep(1)  # pause for 1 second
logger.info('This is an info message')
time.sleep(1)  # pause for 1 second
logger.warning('This is a warning message')
time.sleep(1)  # pause for 1 second
logger.error('This is an error message')
time.sleep(1)  # pause for 1 second
logger.critical('This is a critical message')

# at the end of your script
logging.shutdown()



import logging
import time

# Create and configure logger
logging.basicConfig(filename="logfile.log", 
                    format='%(asctime)s %(levelname)s: %(message)s', 
                    filemode='w')

# Creating an object 
logger=logging.getLogger() 

# Setting the threshold of logger to DEBUG 
logger.setLevel(logging.DEBUG) 

# Log messages
logger.debug('This is a debug message')
time.sleep(1)  # pause for 1 second
logger.info('This is an info message')
time.sleep(1)  # pause for 1 second
logger.warning('This is a warning message')
time.sleep(1)  # pause for 1 second
logger.error('This is an error message')
time.sleep(1)  # pause for 1 second
logger.critical('This is a critical message')
