In [2]:


from googleapiclient.discovery import build
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import os
import base64
import email

SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]

def authenticate_gmail():
    creds = None
    if os.path.exists("token.json"):
        creds = Credentials.from_authorized_user_file("token.json", SCOPES)
    if not creds or not creds.valid:
        from google_auth_oauthlib.flow import InstalledAppFlow
        flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
        creds = flow.run_local_server(port=0)
        with open("token.json", "w") as token:
            token.write(creds.to_json())

    return build("gmail", "v1", credentials=creds)



In [3]:
def search_emails(service, email_address, word, after_date):
    query = f"from:{email_address} subject:{word} has:attachment after:{after_date}"
    results = service.users().messages().list(userId="me", q=query).execute()
    messages = results.get("messages", [])
    return messages


In [4]:
def download_attachments(service, messages, save_dir="attachments"):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for msg in messages:
        msg_id = msg["id"]
        message = service.users().messages().get(userId="me", id=msg_id).execute()
        payload = message["payload"]

        if "parts" in payload:
            for part in payload["parts"]:
                if part["filename"]:  # Check if it's an attachment
                    attachment_id = part["body"]["attachmentId"]
                    attachment = service.users().messages().attachments().get(
                        userId="me", messageId=msg_id, id=attachment_id
                    ).execute()

                    file_data = base64.urlsafe_b64decode(attachment["data"].encode("UTF-8"))
                    file_path = os.path.join(save_dir, part["filename"])

                    with open(file_path, "wb") as f:
                        f.write(file_data)
                        print(f"Downloaded: {file_path}")

In [23]:
def search_emails_bysender(service, email_address, word):
    query = f"from:{email_address} subject:{word} has:attachment"
    results = service.users().messages().list(userId="me", q=query).execute()
    messages = results.get("messages", [])
    return messages
def download_attachments_sender(service, messages, save_dir="attachments"):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for msg in messages:
        msg_id = msg["id"]
        message = service.users().messages().get(userId="me", id=msg_id).execute()
        payload = message["payload"]

        if "parts" in payload:
            for part in payload["parts"]:
                if part["filename"]:  # Check if it's an attachment
                    attachment_id = part["body"]["attachmentId"]
                    attachment = service.users().messages().attachments().get(
                        userId="me", messageId=msg_id, id=attachment_id
                    ).execute()

                    file_data = base64.urlsafe_b64decode(attachment["data"].encode("UTF-8"))
                    file_path = os.path.join(save_dir, part["filename"])

                    with open(file_path, "wb") as f:
                        f.write(file_data)
                        print(f"Downloaded: {file_path}")

import os
import base64
import re
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

def search_emails(service, sender_email, subject_word, after_date):
    """
    Search for emails from a specific sender with a given word in the subject after a certain date.

    :param service: Authenticated Gmail API service instance.
    :param sender_email: Sender's email address.
    :param subject_word: Word to search in the subject.
    :param after_date: Date in YYYY/MM/DD format.
    :return: List of matching email messages.
    """
    query = f"from:{sender_email} subject:{subject_word} has:attachment after:{after_date}"
    results = service.users().messages().list(userId="me", q=query).execute()
    messages = results.get("messages", [])
    return messages

def extract_urls(text):
    """
    Extract URLs from email text using regex.

    :param text: Email body text.
    :return: List of extracted URLs.
    """
    url_pattern = re.compile(r'https?://\S+')
    urls = url_pattern.findall(text)
    return urls

def process_emails(service, messages, save_dir="attachments"):
    """
    Process emails by extracting URLs, downloading attachments, and marking emails as read.

    :param service: Authenticated Gmail API service instance.
    :param messages: List of email messages.
    :param save_dir: Directory to save the attachments.
    """
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for msg in messages:
        msg_id = msg["id"]
        message = service.users().messages().get(userId="me", id=msg_id).execute()
        payload = message.get("payload", {})
        headers = message.get("payload", {}).get("headers", [])

        # Extract email body text
        body = ""
        if "parts" in payload:
            for part in payload["parts"]:
                if part.get("mimeType") == "text/plain" and "data" in part.get("body", {}):
                    body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8")

        # Extract URLs from email body
        urls = extract_urls(body)
        if urls:
            print(f"🔗 URLs found in email {msg_id}:")
            for url in urls:
                print(url)

        # Download attachments
        for part in payload.get("parts", []):
            if part.get("filename"):  # Check if it's an attachment
                attachment_id = part["body"].get("attachmentId")
                if attachment_id:
                    attachment = service.users().messages().attachments().get(
                        userId="me", messageId=msg_id, id=attachment_id
                    ).execute()

                    file_data = base64.urlsafe_b64decode(attachment["data"].encode("UTF-8"))
                    file_path = os.path.join(save_dir, part["filename"])

                    with open(file_path, "wb") as f:
                        f.write(file_data)
                        print(f"✅ Downloaded: {file_path}")

        # Mark email as read
        try:
            service.users().messages().modify(
                userId="me",
                id=msg_id,
                body={"removeLabelIds": ["UNREAD"]}
            ).execute()
            print(f"📩 Email {msg_id} marked as read.")
        except HttpError as error:
            print("")



In [None]:
if __name__ == "__main__":
    gmail_service = authenticate_gmail()
    # m_anane@esi.dz
    # nh_kadri@esi.dz
    # my_haouas@esi.dz
    # b_khelouat@esi.dz 
    # w_hidouci@esi.dz
    # aa_chader@esi.dz
    sender_email = "nh_kadri@esi.dz"  # Change to the sender's email
    keyword = ""  # Change to the keyword in the subject
    after_date = ""  # Change to the desired date (YYYY/MM/DD)

    messages = search_emails(gmail_service, sender_email, keyword, after_date)

    if messages:
        print(f"📩 Found {len(messages)} email(s) from {sender_email} with '{keyword}' in the subject after {after_date}.")
        process_emails(gmail_service, messages)
    else:
        print("❌ No matching emails found.")


📩 Found 5 email(s) from nh_kadri@esi.dz with '' in the subject after .
✅ Downloaded: attachments\My_C.pdf

✅ Downloaded: attachments\[Ass_Course]_Overview_History.pdf
✅ Downloaded: attachments\[Ass_Course]_Memory_Management.pdf
✅ Downloaded: attachments\[Ass_Course]_Addressing_Modes.pdf

✅ Downloaded: attachments\3.evalGroup
✅ Downloaded: attachments\script_evaluation
✅ Downloaded: attachments\2.evalOneStd
✅ Downloaded: attachments\1.Create_Seanario
✅ Downloaded: attachments\Interrogation  (3).pdf

✅ Downloaded: attachments\Modules.zip
✅ Downloaded: attachments\Fiche de lecture.pdf
✅ Downloaded: attachments\TP CPI1 MOUHOUN_LYDIA G10 2020 2021.pdf

✅ Downloaded: attachments\TD_TP 01 intiation (soltion).pdf



In [1]:
if __name__ == "__main__":
    gmail_service = authenticate_gmail()

    sender_email = "nh_kadri@esi.dz"  # Change this to the sender's email
    keyword = ""  # Change this to the keyword in the subject

    messages = search_emails_bysender(gmail_service, sender_email, keyword)

    if messages:
        print(f"Found {len(messages)} email(s) from {sender_email} with '{keyword}' in the subject.")
        download_attachments_sender(gmail_service, messages)
    else:
        print("No matching emails found.")




NameError: name 'authenticate_gmail' is not defined