In [None]:
# Necessary imports
import requests
import random, string
from concurrent import futures
from tqdm import tqdm
import time
from datetime import datetime
import argparse
import os
import sys
import shutil
from internetarchive import search_items
from internetarchive import get_item
import pandas as pd

# Allows us to show more than one ouput in the same cell of the jupyter notebook
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [None]:
# Functions to run the script
def display_error(response, message):
    print(message)
    print(response)
    print(response.text)

def get_book_infos(session, url):
    r = session.get(url).text
    infos_url = "https:" + r.split('bookManifestUrl="')[1].split('"\n')[0]
    response = session.get(infos_url)
    data = response.json()['data']
    title = data['brOptions']['bookTitle'].strip().replace(" ", "_")
    title = ''.join( c for c in title if c not in '<>:"/\\|?*' ) # Filter forbidden chars in directory names (Windows & Linux)
    title = title[:150] # Trim the title to avoid long file names	
    metadata = data['metadata']
    links = []
    for item in data['brOptions']['data']:
        for page in item:
            links.append(page['uri'])

    if len(links) > 1:
        print(f"[+] Found {len(links)} pages")
        return title, links, metadata
    else:
        print(f"[-] Error while getting image links")
        exit()

def format_data(content_type, fields):
    data = ""
    for name, value in fields.items():
        data += f"--{content_type}\x0d\x0aContent-Disposition: form-data; name=\"{name}\"\x0d\x0a\x0d\x0a{value}\x0d\x0a"
    data += content_type+"--"
    return data

def login(email, password):
    session = requests.Session()
    session.get("https://archive.org/account/login")
    content_type = "----WebKitFormBoundary"+"".join(random.sample(string.ascii_letters + string.digits, 16))

    headers = {'Content-Type': 'multipart/form-data; boundary='+content_type}
    data = format_data(content_type, {"username":email, "password":password, "submit_by_js":"true"})

    response = session.post("https://archive.org/account/login", data=data, headers=headers)
    if "bad_login" in response.text:
        print("[-] Invalid credentials!")
        exit()
    elif "Successful login" in response.text:
        print("[+] Successful login")
        return session
    else:
        display_error(response, "[-] Error while login:")

def loan(session, book_id, verbose=True):
    data = {
        "action": "grant_access",
        "identifier": book_id
    }
    response = session.post("https://archive.org/services/loans/loan/searchInside.php", data=data)
    data['action'] = "browse_book"
    response = session.post("https://archive.org/services/loans/loan/", data=data)

    if response.status_code == 400 :
        if response.json()["error"] == "This book is not available to borrow at this time. Please try again later.":
            display_error(response, "This book cannot be borrowed at this time.")
            return session, 0
        else :
            display_error(response, "Something went wrong when trying to borrow the book.")

    data['action'] = "create_token"
    response = session.post("https://archive.org/services/loans/loan/", data=data)

    if "token" in response.text:
        if verbose:
            print("[+] Successful loan")
        return session, 1
    else:
        display_error(response, "Something went wrong when trying to borrow the book, maybe you can't borrow this book.")

def return_loan(session, book_id):
    data = {
        "action": "return_loan",
        "identifier": book_id
    }
    response = session.post("https://archive.org/services/loans/loan/", data=data)
    if response.status_code == 200 and response.json()["success"]:
        print("[+] Book returned")
    else:
        display_error(response, "Something went wrong when trying to return the book")

def image_name(pages, page, directory):
    return f"{directory}/{(len(str(pages)) - len(str(page))) * '0'}{page}.jpg"

def download_one_image(session, link, i, directory, book_id, pages):
    headers = {
        "Referer": "https://archive.org/",
        "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
        "Sec-Fetch-Site": "same-site",
        "Sec-Fetch-Mode": "no-cors",
        "Sec-Fetch-Dest": "image",
    }
    retry = True
    while retry:
        try:
            response = session.get(link, headers=headers)
            if response.status_code == 403:
                session = loan(session, book_id, verbose=False)
                raise Exception("Borrow again")
            elif response.status_code == 200:
                retry = False
        except:
            time.sleep(1)	# Wait 1 second before retrying

    image = image_name(pages, i, directory)
    with open(image,"wb") as f:
        f.write(response.content)


def download(session, n_threads, directory, links, scale, book_id):	
    print("Downloading pages...")
    links = [f"{link}&rotate=0&scale={scale}" for link in links]
    pages = len(links)

    tasks = []
    with futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
        for link in links:
            i = links.index(link)
            tasks.append(executor.submit(download_one_image, session=session, link=link, i=i, directory=directory, book_id=book_id, pages=pages))
        for task in tqdm(futures.as_completed(tasks), total=len(tasks)):
            pass

    images = [image_name(pages, i, directory) for i in range(len(links))]
    return images

def make_pdf(pdf, title, directory):
    file = title+".pdf"
    # Handle the case where multiple books with the same name are downloaded
    i = 1
    while os.path.isfile(os.path.join(directory, file)):
        file = f"{title}({i}).pdf"
        i += 1

    with open(os.path.join(directory, file),"wb") as f:
        f.write(pdf)
    print(f"[+] PDF saved as \"{file}\"")



In [None]:
# Logging into internet archive session
email = "ADD_YOUR_EMAIL_HERE"
password = "ADD_YOUR_PASSWORD_HERE"
session = login(email, password)


In [None]:
# Importing the authors name list from a excel file
import pandas as pd
authors = pd.read_excel('Authors.xlsx')


In [None]:
# Extracting book_ids based on author names
book_id = []
for author in authors['Authors']:
    author = author.replace(".", "").replace("\xa0", "")
    print("Working on Author: {}".format(author))
    items_found = search_items('creator:("{}") AND mediatype:(texts)'.format(author))
    print("Items found: {}".format(len(items_found)))

    if len(items_found) == 0:
        print("="*40)
        continue
    books_saved = 0
    for book in items_found:
        identifier = book['identifier']
        item = get_item(identifier)
        if item.item_metadata['metadata']['creator'] == author:
            if 'language' in item.item_metadata['metadata'].keys():
                if item.item_metadata['metadata']['language'] == 'eng':
                    if identifier not in existing_books_ids:
                        book_id.append(identifier)
                        books_saved+=1
    print("Completed search, books saved; {}".format(books_saved))
    print("Total books: {}".format(len(book_id)))
    print("="*40)

In [None]:
# Filter to keep only unique book identifiers
len(book_id)
book_ids = list(set(book_id))
len(book_ids)

In [None]:
# Creating URLS using the book identifiers we have searched
urls = []
for book in book_ids:
    url = "https://archive.org/details/{}".format(book)
    urls.append(url)

In [None]:
# Check if the urls have been created properly
urls[:5]

In [None]:
# Save the urls to a file

df = pd.DataFrame(urls)
df.to_csv("urls.csv", index=False)

### If for some reason the code execution stops, execute from the cells below to download books from the remaining urls

In [None]:
# Start execution from here to download the remaining urls
# If for some issue your download of book stops, you can always run the code from this cell to start the process again
urls = pd.read_csv('urls.csv')
len(urls)
urls = urls['0']

In [None]:
# Counting the total number of pdf's already downloaded
import glob

folder_path = ''  # Specify the folder path here where you have downloaded your pdf files
file_pattern = '*.pdf'  # Specify the file pattern here

pdf_files = glob.glob(folder_path + '/' + file_pattern)
urls_downloaded = []
for i in pdf_files:
    book = i.split("/")[-1].split(".")[0]
    url = "https://archive.org/details/{}".format(book)
    urls_downloaded.append(url)
    
count = len(urls_downloaded)
print(f"Number of PDF files: {count}")



In [None]:
# Creating a instance of urls_downloaded so that we can count the urls that have already been downloaded
urls_downloaded = []

In [None]:
# Filtering out the urls that have already been downloaded and keeping only those that we have to download
for idx,url in enumerate(urls):
    if url in urls_downloaded:
        print(urls.pop(idx))
        
len(urls)

In [None]:
# This part of the script downloads the remaining urls
scale = 3
n_threads = 50
d = os.getcwd()

# Check the urls format
for url in urls:
    if not url.startswith("https://archive.org/details/"):
        print(f"{url} --> Invalid url. URL must starts with \"https://archive.org/details/\"")
        exit()


print(f"{len(urls)} Book(s) to download")
session = login(email, password)
urls_not_downloaded = []

for url in urls:
    status = 0
    book_id = list(filter(None, url.split("/")))[3]
    print("="*40)
    print(f"Current book: https://archive.org/details/{book_id}")
    session, status = loan(session, book_id)
    if status:
        title, links, metadata = get_book_infos(session, url)

        directory = os.path.join(d, title)
        # Handle the case where multiple books with the same name are downloaded
        i = 1
        _directory = directory
        while os.path.isdir(directory):
            directory = f"{_directory}({i})"
            i += 1
        os.makedirs(directory)

        images = download(session, n_threads, directory, links, scale, book_id)

    #     if not args.jpg: # Create pdf with images and remove the images folder
        import img2pdf

        # prepare PDF metadata
        # sometimes archive metadata is missing
        pdfmeta = { }
        # ensure metadata are str
        for key in ["title", "creator", "associated-names"]:
            if key in metadata:
                if isinstance(metadata[key], str):
                    pass
                elif isinstance(metadata[key], list):
                    metadata[key] = "; ".join(metadata[key])
                else:
                    raise Exception("unsupported metadata type")
        # title
        if 'title' in metadata:
            pdfmeta['title'] = metadata['title']
        # author
        if 'creator' in metadata and 'associated-names' in metadata:
            pdfmeta['author'] = metadata['creator'] + "; " + metadata['associated-names']
        elif 'creator' in metadata:
            pdfmeta['author'] = metadata['creator']
        elif 'associated-names' in metadata:
            pdfmeta['author'] = metadata['associated-names']
        # date
        if 'date' in metadata:
            try:
                pdfmeta['creationdate'] = datetime.strptime(metadata['date'][0:4], '%Y')
            except:
                pass
        # keywords
        pdfmeta['keywords'] = [f"https://archive.org/details/{book_id}"]

        pdf = img2pdf.convert(images, **pdfmeta)
        title = book_id
        make_pdf(pdf, title, d)
        try:
            shutil.rmtree(directory)
        except OSError as e:
            print ("Error: %s - %s." % (e.filename, e.strerror))

        return_loan(session, book_id)
    else:
        print("Book not downloaded")
        urls_not_downloaded.append(url)

In [None]:
# Checking book_ids of the pdf files downloaded
pdf_books_id = []
for i in pdf_files:
    book = i.split("/")[-1].split(".")[0]
    pdf_books_id.append(book)
    
count = len(pdf_books_id)
print(f"Number of PDF files: {count}")

# Collecting metadata for the books downloaded

In [None]:
# Below code downloads the metadata for the books that have been downloaded and saves it to a csv file
import os

author = []
title = []
copy_available = []
year = []
date = []
status_and_links = []
complete = []
file_size = []
extension = []
publisher = []

def get_file_size(file_path):
    size_bytes = os.path.getsize(file_path)
    size_kb = size_bytes / 1024
    return f"{size_kb:.2f}"
    

# Provide the directory path to the folder where the pdf files are kept
directory_path = ''

# Iterate over files in the directory
for file_name in os.listdir(directory_path):
    file_extension = file_name.rsplit(".",1)[1]
    if file_extension == 'pdf':
        file_name_without_extension = file_name.rsplit('.', 1)[0]
        file_path = os.path.join(directory_path, file_name)
        if os.path.isfile(file_path):
            print('Working on book {}'.format(file_name_without_extension))
            item = get_item(file_name_without_extension)
            metadata = item.item_metadata['metadata']

            # Getting the required fields from the metadata
            author.append(metadata['creator'])
            title.append(metadata['title'])
            copy_available.append('Yes')
            year.append(metadata['date'])
            date.append(metadata['date'])
            status_and_links.append("https://archive.org/details/{}".format(file_name_without_extension))
            complete.append('Complete')
            file_size.append(get_file_size(file_path))
            extension.append(file_name.rsplit(".",1)[1])
            if 'publisher' in metadata.keys():
                publisher.append(metadata['publisher'])
            else:
                publisher.append("NA")
            print("Information received for {}".format(file_name_without_extension))
            print("="*40)
        
        
    


## Saving the metadata to a csv file

In [None]:
# Create a dictionary with the data
data = {
    'Author (last,first)' : author ,
    'Title' : title ,
    'Copy available (Yes/No)' : copy_available,
    'Year Published' : year,
    'Status and Links' :status_and_links,
    'Preview/Complete' : complete,
    'Filesize (KB)' : file_size,
    'Filetype' : extension,
    'Publisher' : publisher
}

# Create a DataFrame from the dictionary
df = pd.DataFrame(data)

# Print the DataFrame
print(df)

# Save the DataFrame to a CSV file
df.to_csv('book_metadata.csv', index=False)