In [None]:
from time import time
from os import path, getenv, environ
import subprocess
import tempfile
import shutil
from google.cloud import storage
from concurrent.futures import ThreadPoolExecutor, as_completed

from utils.storage import list_unprocessed_pdf_files

In [None]:
CHUNK_SIZE = 16

In [None]:
def _download_blob(bucket, gcs_pdf_filename, local_pdf_path):
	try:
		blob = bucket.blob(gcs_pdf_filename)
		blob.download_to_filename(local_pdf_path)
		return local_pdf_path
	except Exception as e:
		return None

def _upload_blob(bucket, local_mmd_path, gcs_mmd_filename):
	try:
		mmd_blob = bucket.blob(gcs_mmd_filename)
		mmd_blob.upload_from_filename(local_mmd_path)
		return gcs_mmd_filename
	except Exception as e:
		return None

In [None]:
def process_pdf_files(pdf_filenames, max_workers=10):
	if not pdf_filenames:
		return

	temporaryDir = tempfile.mkdtemp()
	start = time()

	try:
		local_pdf_paths = []
		futures_map = {}

		with ThreadPoolExecutor(max_workers=max_workers) as executor:
			for pdf_filename in pdf_filenames:
				local_pdf_path = path.join(temporaryDir, path.basename(pdf_filename))
				future = executor.submit(_download_blob, bucket, pdf_filename, local_pdf_path)
				futures_map[future] = pdf_filename

			for future in as_completed(futures_map):
				result = future.result()
				if result:
					local_pdf_paths.append(result)

		if not local_pdf_paths:
			return

		try:
			subprocess.run(
				["nougat", "-o", temporaryDir, "--no-skipping"] + local_pdf_paths,
				check=True,
				capture_output=True,
				text=True
			)
		except subprocess.CalledProcessError as e:
			print(e.stderr)

		futures_map = {}
		with ThreadPoolExecutor(max_workers=max_workers) as executor:
			for pdf_filename in pdf_filenames:
				base_name = path.basename(pdf_filename)
				filename, _ = path.splitext(base_name)
				mmd_base_name = f"{filename}.mmd"
				local_mmd_path = path.join(temporaryDir, mmd_base_name)

				if path.exists(local_mmd_path):
					gcs_mmd_filename = f"{path.splitext(pdf_filename)[0]}.mmd"
					future = executor.submit(_upload_blob, bucket, local_mmd_path, gcs_mmd_filename)
					futures_map[future] = gcs_mmd_filename

			for future in as_completed(futures_map):
				try:
					future.result()
				except Exception as e:
					pass

		print(f"Finished in {time() - start} seconds")

	except Exception as e:
		print(e)
	finally:
		if path.exists(temporaryDir):
			shutil.rmtree(temporaryDir)

In [None]:
def iterate_in_chunks(data, chunk_size):
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

In [None]:
unprocessedFiles = list_unprocessed_pdf_files()
print(f"Found {len(unprocessedFiles)} PDF files to process")

In [None]:
processed_files = []
errors = []
for filenames in iterate_in_chunks(unprocessedFiles, CHUNK_SIZE):
    process_pdf_files(filenames)
    processed_files += filenames
    print(f"Finished processing for {len(filenames)} files")