In [2]:
# Inpsired by - https://github.com/mrocklin/arxiv-matplotlib
# Released with BSD-3-Clause license - https://opensource.org/license/bsd-3-clause


%matplotlib inline

In [3]:
from google.cloud import storage
import coiled

# set key credentials file path
# import os
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = 'C:\\Users\\\USERNAME_GOES_HERE\\AppData\\Roaming\\gcloud\\application_default_credentials.json'

# define function that list files in the bucket
def list_cs_files(bucket_name, path, limit=1000): 
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    print("Bucket is: "+str(bucket))

    blobs = bucket.list_blobs(prefix=path, max_results=limit)

    file_list = [file.name for file in blobs]

    return file_list

# Path - 'gs://arxiv-dataset/arxiv/' -  gs://<bucket_name>/<file_path_inside_bucket>.

file_list = list_cs_files('arxiv-dataset', 'arxiv', limit=1000)
pdf_files = [filename for filename in file_list if filename.endswith(".pdf")]
print(pdf_files[:10])
print("Length of PDF List: "+str(len(pdf_files)))

Bucket is: <Bucket: arxiv-dataset>
['arxiv/acc-phys/pdf/9411/9411001v1.pdf', 'arxiv/acc-phys/pdf/9411/9411002v1.pdf', 'arxiv/acc-phys/pdf/9411/9411003v1.pdf', 'arxiv/acc-phys/pdf/9411/9411004v1.pdf', 'arxiv/acc-phys/pdf/9502/9502001v1.pdf', 'arxiv/acc-phys/pdf/9502/9502002v1.pdf', 'arxiv/acc-phys/pdf/9503/9503001v2.pdf', 'arxiv/acc-phys/pdf/9503/9503002v1.pdf', 'arxiv/acc-phys/pdf/9503/9503003v1.pdf', 'arxiv/acc-phys/pdf/9503/9503004v1.pdf']
Length of PDF List: 377


In [4]:
%%time
def extract(file_set):
    """ Extract and process one directory of arXiv data
    
    Returns
    -------
    filename: str
    contains_matplotlib: boolean
    """
    out = []
    
    # Create a connection per file_set
    storage_client = storage.Client()
    bucket = storage_client.bucket('arxiv-dataset')

    for filename in file_set:

        """Read the content of a PDF file from Google Cloud Storage."""
        # client = storage.Client()
        # bucket = client.bucket(bucket_name)
        blob = bucket.blob(filename)

        try:
            # Download the PDF content as bytes
            pdf_bytes = blob.download_as_bytes()
        except Exception as e:
            print(f"Error reading PDF: {str(e)}")
        
        out.append((
            filename, 
            b"matplotlib" in pdf_bytes.lower()
        ))

    return out

out = extract(pdf_files[0:10])
print(out)

[('arxiv/acc-phys/pdf/9411/9411001v1.pdf', False), ('arxiv/acc-phys/pdf/9411/9411002v1.pdf', False), ('arxiv/acc-phys/pdf/9411/9411003v1.pdf', False), ('arxiv/acc-phys/pdf/9411/9411004v1.pdf', False), ('arxiv/acc-phys/pdf/9502/9502001v1.pdf', False), ('arxiv/acc-phys/pdf/9502/9502002v1.pdf', False), ('arxiv/acc-phys/pdf/9503/9503001v2.pdf', False), ('arxiv/acc-phys/pdf/9503/9503002v1.pdf', False), ('arxiv/acc-phys/pdf/9503/9503003v1.pdf', False), ('arxiv/acc-phys/pdf/9503/9503004v1.pdf', False)]
CPU times: total: 15.6 ms
Wall time: 3.5 s


In [5]:
chunk_size = 10
chunked_list = [pdf_files[i:i + chunk_size] for i in range(0, len(pdf_files), chunk_size)]
print(chunked_list[0])
print("Number of chunks: "+str(len(chunked_list)))

['arxiv/acc-phys/pdf/9411/9411001v1.pdf', 'arxiv/acc-phys/pdf/9411/9411002v1.pdf', 'arxiv/acc-phys/pdf/9411/9411003v1.pdf', 'arxiv/acc-phys/pdf/9411/9411004v1.pdf', 'arxiv/acc-phys/pdf/9502/9502001v1.pdf', 'arxiv/acc-phys/pdf/9502/9502002v1.pdf', 'arxiv/acc-phys/pdf/9503/9503001v2.pdf', 'arxiv/acc-phys/pdf/9503/9503002v1.pdf', 'arxiv/acc-phys/pdf/9503/9503003v1.pdf', 'arxiv/acc-phys/pdf/9503/9503004v1.pdf']
Number of chunks: 38


In [6]:
%%time
cluster = coiled.Cluster(
    n_workers=5,
    name="arxiv",
    package_sync=True, 
    backend_options={"region": "us-east1"},  # faster and cheaper
)
from dask.distributed import Client, wait
# Note for client to work in Jupyter, it needs to be updated to use Dask as shown here
client = Client(cluster)

Output()

Output()

ClusterCreationError: Cluster status is error (reason: Workers all had error -> Software build failed -> Unknown error during env install, we suggest you contact us at support@coiled.io. Full build logs are available here: https://cloud.coiled.io/software/alias/91401/build/84515?account=moco-makers-group&tab=logs ) (cluster_id: 528056)

In [7]:
futures = client.map(extract, chunked_list)
wait(futures)

# We had one error in one file.  Let's just ignore and move on.
good = [future for future in futures if future.status == "finished"]

lists = client.gather(good)

print(lists)
print("Closing cluster")
cluster.close()

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000021DFBC1A050>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x0000021DFBC63620>, 78448.5)]']
connector: <aiohttp.connector.TCPConnector object at 0x0000021DDC5B7590>


NameError: name 'client' is not defined