# Python-Notebook zum Zusammenfügen von Weihnachtsbriefen



## Setup

In [None]:
import pathlib

ROOT_DIR_PATH = pathlib.Path("../")
DATA_DIR_PATH = ROOT_DIR_PATH / "data" / "Weihnachtsbrief"
TEMPLATE_DIR_PATH = ROOT_DIR_PATH / "templates"
OUT_DIR_PATH = ROOT_DIR_PATH / "out" / "Weihnachtsbrief"

In [None]:
from dotenv import load_dotenv

load_dotenv(dotenv_path=(ROOT_DIR_PATH / ".env"))

## Import der Empfänger



In [None]:
RECIPIENT_FILE_PATH = DATA_DIR_PATH / 'Empfänger.xlsx'

In [None]:
import warnings
import pandas

def read_excel(
    workbook_path: str,
    sheet_name: str,
    header_map: dict = {},
    skip_rows: int = 0
) -> pandas.DataFrame:
    warnings.filterwarnings('ignore', module='openpyxl')
    df = pandas.read_excel(
        workbook_path,
        sheet_name=sheet_name,
        skiprows=max(0, skip_rows - 1),
    )
    df = df.drop(columns=[col for col in df if col not in header_map.keys()])
    df = df.rename(columns=header_map)
    warnings.filterwarnings('default', module='openpyxl')
    return df

In [None]:
# Load the recipients from the "Automatisch" worksheet
recipients = read_excel(
    workbook_path=RECIPIENT_FILE_PATH,
    sheet_name="Automatisch",
    header_map={
        'ID': 'id',
        'Voller Name': 'full_name',
        'Volle Einleitung': 'salutation',
        'Rolle': 'role',
        'Volle Form': 'form',
        'E-Mail': 'email',
        'Sendeart': 'send_type',
        'Erstellt': 'created',
        'Versendet': 'sent'
    },
    skip_rows=1
)
# Convert 'created' and 'sent' columns to booleans
# If the value is empty, it is False, otherwise True
recipients['created'] = recipients['created'].notnull()
recipients['sent'] = recipients['sent'].notnull()
# Show the recipients
recipients

## Zusammenfügen der Briefe

In [None]:
SALUTATION_DIR_PATH = DATA_DIR_PATH / 'Anreden'
MAIN_FILE_PATH      = DATA_DIR_PATH / 'Hauptteil.pdf'
ENDING_DIR_PATH     = DATA_DIR_PATH / 'Enden'

In [None]:
import os

# Create the output directories if they don't exist
for send_type in ['Brief', 'Email']:
    path = OUT_DIR_PATH / send_type
    if not path.exists():
        os.makedirs(path)

In [None]:
from pypdf import PdfWriter
from datetime import datetime

def merge_pdfs(*file_paths: pathlib.Path) -> PdfWriter:
    writer = PdfWriter()
    for file_path in file_paths:
        assert os.path.exists(file_path), f'File "{file_path}" does not exist.'
        assert file_path.suffix == '.pdf', f'File "{file_path}" is not a pdf file.'
        with open(file_path, 'rb') as file:
            writer.append(file)
    return writer

In [None]:
# Create the letters
for i, recipient in enumerate(recipients.itertuples()):
    # Merge the correct salutation, main part and ending
    writer = merge_pdfs(
        SALUTATION_DIR_PATH / f'Anrede-{recipient.id}.pdf',
        MAIN_FILE_PATH,
        ENDING_DIR_PATH / f'Ende-{recipient.form}.pdf'
    )
    # Export the merged pdf
    file_path = OUT_DIR_PATH / recipient.send_type / f'JuBO-Weihnachtsbrief-{datetime.now().year}-{recipient.id}.pdf'
    with open(file_path, 'wb') as file:
        writer.write(file)
    writer.close()
    # Store the file path of the letter in the data frame
    recipients.loc[recipient.Index, 'file_path'] = file_path

recipients[['full_name', 'file_path']]

## Vorbereitung der E-Mails

In [None]:
import os

TEMPLATE_NAME = 'weihnachtsbrief.html.jinja'

EMAIL_USER = os.getenv('EMAIL_USER')
EMAIL_SIGNATURE_NAME  = os.getenv('EMAIL_SIGNATURE_NAME')
EMAIL_SIGNATURE_ROLE  = os.getenv('EMAIL_SIGNATURE_ROLE')
EMAIL_SIGNATURE_EMAIL = os.getenv('EMAIL_SIGNATURE_EMAIL')
EMAIL_SIGNATURE_PHONE = os.getenv('EMAIL_SIGNATURE_PHONE')

In [None]:
import pathlib
from dataclasses import dataclass
from email.message import EmailMessage
from typing import List, Optional


class Attachment:
    file_path: pathlib.Path
    mime_type: str
    name: str

    def __init__(
        self, file_path: pathlib.Path, mime_type: str, name: Optional[str] = None
    ):
        self.file_path = file_path
        assert os.path.isfile(
            file_path
        ), f'Attachment file does not exist: "{file_path}"'
        self.mime_type = mime_type
        assert (
            len(mime_type.split("/")) == 2
        ), f'Invalid mime type: "{mime_type}". Expected "<maintype>/<subtype>".'
        self.name = name if name else file_path.name

    @property
    def mime_maintype(self) -> str:
        return self.mime_type.split("/")[0]

    @property
    def mime_subtype(self) -> str:
        return self.mime_type.split("/")[1]


@dataclass
class Email:
    sender: str
    to: List[str]
    subject: str
    content: str
    cc: Optional[List[str]] = None
    bcc: Optional[List[str]] = None
    attachments: Optional[List[Attachment]] = None

    def as_message(self) -> EmailMessage:
        message = EmailMessage()
        message["From"] = self.sender
        message["To"] = ", ".join(self.to)
        if self.cc:
            message["Cc"] = ", ".join(self.cc)
        if self.bcc:
            message["Bcc"] = ", ".join(self.bcc)
        message["Subject"] = self.subject
        message.set_content(self.content, subtype="html")
        if self.attachments:
            for attachment in self.attachments:
                with open(attachment.file_path, "rb") as file:
                    message.add_attachment(
                        file.read(),
                        maintype=attachment.mime_maintype,
                        subtype=attachment.mime_subtype,
                        filename=attachment.name,
                    )
        return message

    def __str__(self) -> str:
        return self.as_message().as_string()

In [None]:
from jinja2 import Environment, FileSystemLoader, Template


def read_template(name: str) -> Template:
    env = Environment(
        loader=FileSystemLoader(TEMPLATE_DIR_PATH), trim_blocks=True, lstrip_blocks=True
    )
    return env.get_template(name)


template = read_template(TEMPLATE_NAME)

In [None]:
# Filter recipients that recieve an e-mail, have an e-mail address and have not yet been sent
email_recipients = recipients[
    (recipients["send_type"] == "Email") & (recipients["sent"] == False)
]
missing_email_recipients = email_recipients[email_recipients["email"].isnull()]
email_recipients = email_recipients[email_recipients["email"].notnull()]
# Show the actual recipients
email_recipients[["full_name", "email"]]

In [None]:
from datetime import datetime
from IPython.display import display, HTML

print(
    f"Creating e-mails for {len(email_recipients)} recipients. (Skipped {len(missing_email_recipients)} recipients with missing e-mail addresses)"
)

# Prepare the e-mails
emails: List[Email] = []
for recipient in email_recipients.itertuples():
    content = template.render(
        salutation=recipient.salutation,
        form=recipient.form,
        signature_name=EMAIL_SIGNATURE_NAME,
        signature_role=EMAIL_SIGNATURE_ROLE,
        signature_email=EMAIL_SIGNATURE_EMAIL,
        signature_phone=EMAIL_SIGNATURE_PHONE,
    )
    email = Email(
        sender=EMAIL_USER,
        to=[recipient.email],
        subject=f"JuBO e.V. | Weihnachtsbrief {datetime.now().year} | {recipient.full_name}",
        content=content,
        attachments=[Attachment(recipient.file_path, "application/pdf")],
    )
    emails.append(email)

emails[0]

In [None]:
display(HTML(emails[0].content))

## Erstellen der E-Mails im Postfach

In [None]:
import os

EMAIL_USER      = os.getenv('EMAIL_USER')
EMAIL_PASSWORD  = os.getenv('EMAIL_PASSWORD')
EMAIL_IMAP_HOST = os.getenv('EMAIL_IMAP_HOST')
EMAIL_IMAP_PORT = os.getenv('EMAIL_IMAP_PORT')
EMAIL_SMTP_HOST = os.getenv('EMAIL_SMTP_HOST')
EMAIL_SMTP_PORT = os.getenv('EMAIL_SMTP_PORT')

EMAIL_SEND_TRIES = 3

In [None]:
import imaplib
import smtplib
import ssl
import time


class EmailClientWithSSL:
    _imap = imaplib.IMAP4_SSL
    _smtp = smtplib.SMTP_SSL

    def __init__(
        self,
        user: str,
        password: str,
        imap_host: str,
        imap_port: int,
        smtp_host: str,
        smtp_port: int,
    ) -> None:
        # Connect to the IMAP server with SSL
        try:
            self._imap = imaplib.IMAP4_SSL(imap_host, imap_port)
            self._imap.login(user, password)
        except Exception as ex:
            self.close()
            raise ex
        # Connect to the SMTP server with SSL
        try:
            self._smtp = smtplib.SMTP_SSL(
                smtp_host, smtp_port, context=ssl.create_default_context()
            )
            self._smtp.login(user, password)
        except:
            self.close()
            raise ex

    def send(self, email: Email) -> None:
        self._smtp.sendmail(
            from_addr=email.sender, to_addrs=email.to, msg=str(email)
        )
        self._imap.append(
            mailbox="Sent",
            flags="\\Seen",
            date_time=imaplib.Time2Internaldate(time.time()),
            message=str(email).encode("utf8"),
        )

    def draft(self, email: Email) -> None:
        self._imap.append(
            mailbox="Drafts",
            flags="\\Draft",
            date_time=imaplib.Time2Internaldate(time.time()),
            message=str(email).encode("utf8"),
        )

    def close(self) -> None:
        if self._imap:
            try:
                self._imap.close()
            except:
                pass
        if self._smtp:
            try:
                self._smtp.close()
            except:
                pass


email_client = EmailClientWithSSL(
    user=EMAIL_USER,
    password=EMAIL_PASSWORD,
    imap_host=EMAIL_IMAP_HOST,
    imap_port=EMAIL_IMAP_PORT,
    smtp_host=EMAIL_SMTP_HOST,
    smtp_port=EMAIL_SMTP_PORT,
)

In [None]:
# This flag is used to prevent accidentally creating e-mails. Set it to True
# before running the cell to create the e-mail drafts.
CREATE_REAL_EMAILS = True

In [None]:
# Create the drafts
if CREATE_REAL_EMAILS:
    for email in emails:
        print(f'Creating e-mail draft for "{",".join(email.to)}"')
        for t in range(EMAIL_SEND_TRIES):
            try:
                email_client.draft(email)
                break
            except Exception as ex:
                print(f'Failed to create draft. Retrying ({t+1}/{EMAIL_SEND_TRIES}) after {t * 5} seconds...')
                print(ex)
                time.sleep(t * 5)
                if t == EMAIL_SEND_TRIES - 1:
                    raise ex
    print(f'{len(emails)} e-mail drafts created successfully.')
else:
    print(f'Not creating e-mail drafts. Set the "CREATE_REAL_EMAILS" flag to True to create the e-mail drafts.')