In [None]:
import requests
import xml.etree.ElementTree as ET

In [None]:
search_term = "Quantum Gravity"

max_results = 10
query = "+".join(search_term.lower().split())
url = f"http://export.arxiv.org/api/query?search_query=all:{query}max_results={max_results}"
url
resp = requests.get(url)

In [None]:
resp

In [None]:
resp.text

In [None]:
import xml.etree.ElementTree as ET
import json
from collections import defaultdict

def parse_arxiv_xml(xml_string):
    # Define namespaces
    namespaces = {
        "atom": "http://www.w3.org/2005/Atom",
        "opensearch": "http://a9.com/-/spec/opensearch/1.1/",
        "arxiv": "http://arxiv.org/schemas/atom"
    }

    root = ET.fromstring(xml_string)

    feed = {
        "title": root.find("atom:title", namespaces).text if root.find("atom:title", namespaces) is not None else "",
        "id": root.find("atom:id", namespaces).text if root.find("atom:id", namespaces) is not None else "",
        "updated": root.find("atom:updated", namespaces).text if root.find("atom:updated", namespaces) is not None else "",
        "totalResults": root.find("opensearch:totalResults", namespaces).text if root.find("opensearch:totalResults", namespaces) is not None else "",
        "startIndex": root.find("opensearch:startIndex", namespaces).text if root.find("opensearch:startIndex", namespaces) is not None else "",
        "itemsPerPage": root.find("opensearch:itemsPerPage", namespaces).text if root.find("opensearch:itemsPerPage", namespaces) is not None else "",
        "entries": []
    }

    for entry in root.findall("atom:entry", namespaces):
        authors = [author.find("atom:name", namespaces).text for author in entry.findall("atom:author", namespaces)]
        categories = [category.attrib.get("term", "") for category in entry.findall("atom:category", namespaces)]

        entry_data = {
            "id": entry.find("atom:id", namespaces).text,
            "updated": entry.find("atom:updated", namespaces).text,
            "published": entry.find("atom:published", namespaces).text,
            "title": entry.find("atom:title", namespaces).text,
            "summary": entry.find("atom:summary", namespaces).text.strip(),
            "authors": authors,
            "comment": entry.find("arxiv:comment", namespaces).text if entry.find("arxiv:comment", namespaces) is not None else "",
            "journal_ref": entry.find("arxiv:journal_ref", namespaces).text if entry.find("arxiv:journal_ref", namespaces) is not None else "",
            "doi": entry.find("arxiv:doi", namespaces).text if entry.find("arxiv:doi", namespaces) is not None else "",
            "links": {link.attrib.get("title", "default"): link.attrib.get("href", "") for link in entry.findall("atom:link", namespaces)},
            "primary_category": entry.find("arxiv:primary_category", namespaces).attrib.get("term", "") if entry.find("arxiv:primary_category", namespaces) is not None else "",
            "categories": categories
        }

        feed["entries"].append(entry_data)

    return feed

In [None]:
data = parse_arxiv_xml(resp.text)

In [None]:
data

In [None]:
list(data.keys())

In [None]:
len(data["entries"])

In [None]:
data["entries"][0]

In [None]:
[paper["categories"] for paper in data["entries"]]

In [None]:
def get_arxiv_papers(search_term: str) -> dict:
    max_results = 10
    query = "+".join(search_term.lower().split())
    for char in list('()" '):
        if char in query:
            raise ValueError(f"Cannot have character: '{char}' in query: {query}")
    url = f"http://export.arxiv.org/api/query?search_query=all:{query}&max_results={max_results}"
    resp = requests.get(url)
    print(resp)
    data = parse_arxiv_xml(resp.text)
    print(json.dumps([{"title": paper["title"], "categories": paper["categories"]} for paper in data["entries"]], indent=2))
    return data, resp

In [None]:
data, resp = get_arxiv_papers("BEC BCS")

In [None]:
data["entries"][0]

In [None]:
print(data["entries"][0]["summary"])

In [None]:
# Download paper as tex file

import requests
import tarfile
import io

# URL of the .tar.gz file
url = "https://arxiv.org/src/1003.4735v1"

# Download the file into memory
resp = requests.get(url, stream=True)
if resp.status_code == 200:
    tar_gz_data = io.BytesIO(resp.content)  # Load resp content into memory
    
    # Open the tar.gz file in memory
    with tarfile.open(fileobj=tar_gz_data, mode="r:gz") as tar:
        # Iterate over each file in the archive
        for member in tar.getmembers():
            if member.isfile():  # Skip directories
                file_obj = tar.extractfile(member)
                if file_obj:
                    try:
                        content = file_obj.read().decode("utf-8")  # Read and decode file
                        print(f"\n--- {member.name} ---\n")
                        print(content[:500])  # Print first 500 characters
                    except UnicodeDecodeError:
                        print(f"Skipping binary file: {member.name}")
else:
    print(f"Failed to download file. Status code: {resp.status_code}")

In [None]:
content

In [None]:
[paper["links"] for paper in data["entries"]]

In [None]:
import anthropic
import base64
import httpx

# Load and encode the PDF
pdf_url = "https://assets.anthropic.com/m/1cd9d098ac3e6467/original/Claude-3-Model-Card-October-Addendum.pdf"
pdf_data = base64.standard_b64encode(httpx.get(pdf_url).content).decode("utf-8")

# Send to Claude
client = anthropic.Anthropic()
message = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "document",
                    "source": {
                        "type": "base64",
                        "media_type": "application/pdf",
                        "data": pdf_data
                    }
                },
                {
                    "type": "text",
                    "text": "What are the key findings in this document?"
                }
            ]
        }
    ],
)

print(message.content)

In [None]:
# How to print state?

from typing import Annotated

from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict

from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver


class State(TypedDict):
    messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

config = {"configurable": {"thread_id": "1"}}
memory = MemorySaver()

llm = ChatOpenAI(model="gpt-4o-mini")


def chatbot(state: State):
    return {"messages": [llm.invoke(state["messages"])]}


# The first argument is the unique node name
# The second argument is the function or object that will be called whenever
# the node is used.
graph_builder.add_node("chatbot", chatbot)
graph_builder.set_entry_point("chatbot")
graph_builder.set_finish_point("chatbot")
graph = graph_builder.compile(checkpointer=memory)


def stream_graph_updates(user_input: str):
    for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config):
        for value in event.values():
            print("Assistant:", value["messages"][-1].content)

    snapshot = graph.get_state(config)
    return snapshot

while True:
    user_input = input("User: ")
    if user_input.lower() in ["quit", "exit", "q"]:
        print("Goodbye!")
        break

    snapshot = stream_graph_updates(user_input)

In [None]:
snapshot

In [None]:
snapshot.*?

In [None]:
snapshot.values

In [None]:
import json
json.dumps(snapshot.values)

In [None]:
snapshot.values["messages"][-1].content