In [None]:
import os.path
import base64
import datetime

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
DATA_PATH = "../../.data"
CREDENTIALS_JSON_PATH = f"{DATA_PATH}/credentials.json"
TOKEN_JSON_PATH = f"{DATA_PATH}/token.json"

XML_MIME_TYPE = "text/xml"
PDF_MIME_TYPE = "application/pdf"

In [None]:
class GmailAttachment:
    def __init__(self, part):
        self.mimetype = part["mimeType"]
        self.filename = part["filename"]
        self.partId = part["partId"]
        self.attachmentId = part["body"]["attachmentId"]

    def __str__(self):
        return "\n    ".join(
            [
                f"{self.attachmentId[:16]}...{self.attachmentId[-16:]}",
                f"filename: {self.filename}",
                f"mimeType: {self.mimetype}",
                f"partId: {self.partId}",
            ]
        )


class GmailMessage:
    def __init__(self, message, messageId: str):
        self.__full_message = message
        headers = self.__full_message["payload"]["headers"]
        self.messageId = messageId
        self.sender = next(
            (h["value"] for h in headers if h["name"].lower() == "from"), None
        )
        self.subject = next(
            (h["value"] for h in headers if h["name"].lower() == "subject"), None
        )
        self.intarnalDate = datetime.datetime.fromtimestamp(
            int(message["internalDate"]) / 1000
        )
        self.messageDate = next(
            (h["value"] for h in headers if h["name"].lower() == "date"), None
        )
        self.__parts = self.__full_message["payload"]["parts"]

        self.xml_attachment = next(
            (
                GmailAttachment(p)
                for p in self.__parts
                if p["mimeType"].lower() == XML_MIME_TYPE
            ),
            None,
        )

        self.pdf_attachment = next(
            (
                GmailAttachment(p)
                for p in self.__parts
                if p["mimeType"].lower() == PDF_MIME_TYPE
            ),
            None,
        )

    def __str__(self):
        return "\n  ".join(
            [
                f"id: {self.messageId}",
                f"from: {self.sender}",
                f"subject: {self.subject}",
                f"xml: {self.xml_attachment}",
                f"pdf: {self.pdf_attachment}",
            ]
        )

In [None]:
def authenticate_gmail_api():
    """Authenticates with the Gmail API, handling refresh tokens.

    This function attempts to load existing user credentials from 'token.json'.
    If credentials are not found or are invalid/expired, it initiates a new
    OAuth 2.0 flow to prompt the user for authorization in their browser.
    The 'refresh_token' is automatically handled by InstalledAppFlow when
    'access_type=offline' is implicitly or explicitly requested, allowing
    the application to obtain new access tokens without re-prompting the user.
    """
    creds = None
    # The file token.json stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first
    # time.
    if os.path.exists(TOKEN_JSON_PATH):
        creds = Credentials.from_authorized_user_file(TOKEN_JSON_PATH, SCOPES)
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            print("Access token expired, attempting to refresh...")
            creds.refresh(Request())
        else:
            print("No valid credentials found, initiating new OAuth flow...")
            # Use InstalledAppFlow for desktop applications.
            # The 'credentials.json' file is the one downloaded from Google Cloud.
            flow = InstalledAppFlow.from_client_secrets_file(
                CREDENTIALS_JSON_PATH, SCOPES
            )
            # Run the local server to handle the OAuth 2.0 redirect.
            # This will open a browser window for the user to grant permissions.
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run, including the refresh token.
        with open(TOKEN_JSON_PATH, "w") as token:
            token.write(creds.to_json())

    # Build and return the Gmail API service object.
    try:
        service = build("gmail", "v1", credentials=creds)
        print("Gmail API service successfully built.")
        return service
    except HttpError as error:
        print(f"An error occurred while building the Gmail API service: {error}")
        return None

In [None]:
def get_messages_list(
    gmail_service,
    userId: str = "me",
    maxResults: int = 5,
    start: datetime.date = None,
    end: datetime.date = None,
):
    if gmail_service:
        # Now you can use the 'gmail_service' object to interact with the Gmail API.
        # For instance, to list messages:

        query = "has:attachment filename:pdf filename:xml subject:factura"

        if start is not None:
            query = query + f" after:{start}"

        if end is not None:
            query = query + f" before:{end}"

        try:
            results = (
                gmail_service.users()
                .messages()
                .list(
                    userId=userId,
                    maxResults=maxResults,
                    q=query,
                )
                .execute()
            )
            messages = results.get("messages", [])

            return messages
        except HttpError as error:
            print(f"An error occurred while listing messages: {error}")

In [None]:
def get_message_details(
    gmail_service, id: str, userId: str = "me", format: str = "minimal"
):
    if gmail_service:
        # Now you can use the 'gmail_service' object to interact with the Gmail API.
        # For instance, to list messages:
        try:
            result = (
                gmail_service.users()
                .messages()
                .get(userId=userId, id=id, format=format)
                .execute()
            )

            return result
        except HttpError as error:
            print(f"An error occurred while listing messages: {error}")

In [None]:
gmail_service = authenticate_gmail_api()

messages = get_messages_list(
    gmail_service,
    maxResults=50,
    start=datetime.date(2025, 5, 1),
    end=datetime.date(2025, 6, 1),
)

messagesDetails = {}

if not messages:
    print("No messages found.")
else:
    for message in messages:
        messageId = message["id"]
        print(f"Fetching details for message {messageId}")
        messagesDetails[messageId] = get_message_details(
            gmail_service, messageId, format="full"
        )

In [None]:
gmailMessages = {}

for message_id, full_message in messagesDetails.items():
    gmailMessage = GmailMessage(full_message, message_id)
    print(gmailMessage)
    print("-" * 40)