[![Buy Me A Coffee](https://img.shields.io/badge/Buy%20Me%20A%20Coffee-support%20my%20work-FFDD00?style=flat&labelColor=101010&logo=buy-me-a-coffee&logoColor=white)](https://www.buymeacoffee.com/r0mymendez)

---

# üè∑Ô∏è Data-Driven Project Analysis: Analyzing Trello Kanban Projects with AI on AWS Bedrock

## Introduction

Modern software projects often involve multiple distributed teams working on high-complexity initiatives, with frequent releases and ongoing production fixes. While tools like Kanban boards help organize tasks, epics, and workflows, they also generate large volumes of unstructured data in the form of comments, status changes, and timelines.
As the number of interdependent tasks and contributors grows, understanding the real state of a project, and identifying early risks or bottlenecks, becomes increasingly difficult. Manual analysis is time-consuming and often subjective.

<center>

![preview](img/trello-preview-title.png)

</center>

In this notebook, I present a practical use case that leverages AWS services and generative AI to enhance project analysis and interpretation. By analyzing task metadata and detecting semantic patterns in comments (such as ambiguity, implicit dependencies, missing definitions, or scope creep) AI enables more objective insights, early warnings, and data-driven decision-making


>install additional python modules

In [None]:
%additional_python_modules boto3==1.34.34,botocore==1.34.34,markdown==3.5.2,beautifulsoup4==4.12.3,reportlab==4.0.8

Additional python modules to be included:
boto3==1.34.34
botocore==1.34.34
markdown==3.5.2
beautifulsoup4==4.12.3
reportlab==4.0.8


# Reference Architecture
Before diving into the implementation details, it is useful to understand the overall architecture that supports this use case. The following reference architecture illustrates how project data flows from Trello through AWS services and into an AI-powered analysis pipeline.

The entire process is executed through an AWS Glue job implemented in Python, which orchestrates data extraction, transformation, AI inference, and report generation in a scalable and automated manner. 

At a high level, the architecture ingests Kanban project data from Trello, enriches it with temporal and contextual metadata, applies semantic analysis using generative AI models on AWS Bedrock, and produces structured, human-readable reports for project stakeholders.

<br>

![blueprint](img/trello-reference-architecture_.png)

----

### Core Components
#### 1. üìãTrello Integration Class
Connects to Trello boards via the Trello API
Retrieves boards, lists, and cards with enriched metadata
Calculates time-based metrics (e.g., days until due date)
Exports structured data to Amazon S3 in JSON format


#### 2. ‚ú®AWS Bedrock Integration
Invokes the Amazon Nova model using custom prompts
Processes project datasets to generate semantic insights
Uses configurable inference parameters to balance cost and accuracy


#### 3.üìä Report Generation (MarkdownPDFReport)
Converts AI-generated markdown into professional PDF reports
Applies custom styling for readability and consistency
Supports tables, lists, and structured summaries


#### 4. Supporting Services
üîêAWS Secrets Manager: securely stores Trello API credentials
ü™£Amazon S3: stores datasets, prompts, and generated reports
üì©Amazon SES: distributes automated reports via email


In [None]:

import pandas as pd
import os, requests, json,ast
from datetime import datetime,timezone
import boto3
from botocore.exceptions import ClientError
from io import BytesIO

# pdf report
from markdown import markdown
from bs4 import BeautifulSoup
from reportlab.platypus import (
    SimpleDocTemplate, Paragraph, Spacer, ListFlowable, 
    ListItem, Table, TableStyle, PageBreak
)

from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.units import inch
from reportlab.lib.enums import TA_CENTER, TA_LEFT
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import HRFlowable

import boto3
import base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication

Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 1bfe7d8c-9ffd-4001-b272-118975011667
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--additional-python-modules boto3==1.34.34,botocore==1.34.34,markdown==3.5.2,beautifulsoup4==4.12.3,reportlab==4.0.8
Waiting for session 1bfe7d8c-9ffd-4001-b272-118975011667 to get into ready status...
Session 1bfe7d8c-9ffd-4001-b272-118975011667 has been created.



In [None]:
class Trello:
    def __init__(self, BUCKET_NAME="", API_KEY="", API_TOKEN="", S3=None): 
        self.API_KEY = API_KEY
        self.API_TOKEN = API_TOKEN
        self.BASE_URL = "https://api.trello.com/1"
        self.DATAFRAME_COLUMNS = [
            'id', 'dueComplete', 'desc', 'listName', 'name', 
            'start', 'checkItems', 'checkItemsChecked', 'due', 'time_to_due'
        ]
        self.BUCKET_NAME = BUCKET_NAME
        self.s3_writer = S3
    
    def get_auth_params(self):
        return {
            'key': self.API_KEY,
            'token': self.API_TOKEN
        }
        
    def get_board_data(self, board_name: str):
        response = requests.get(f"{self.BASE_URL}/members/me/boards", params=self.get_auth_params())
        data = pd.DataFrame(response.json())
        board_id = data[data['name'] == board_name]['id'].values[0] 
        return board_id
    
    def get_lists(self, board_id: int):
        response = requests.get(f"{self.BASE_URL}/boards/{board_id}/lists", params=self.get_auth_params())
        lists_data = pd.DataFrame(response.json())
        return lists_data
    
    def get_cards(self, board_id: int):
        response = requests.get(f"{self.BASE_URL}/boards/{board_id}/cards", params=self.get_auth_params())
        cards_data = pd.DataFrame(response.json())
        return cards_data
    
    def get_enriched_cards(self, board_id: int, lists_data: pd.DataFrame):
        params_cards = self.get_auth_params().copy()
        params_cards.update({
            'customFieldItems': 'true',
            'checklists': 'all',
            'actions': 'all'
        })
        response = requests.get(f"{self.BASE_URL}/boards/{board_id}/cards", params=params_cards)
        cards_raw = response.json()
        data_cards = pd.DataFrame(cards_raw)
        data_cards['checkItems'] = [card['badges']['checkItems'] for card in cards_raw]
        data_cards['checkItemsChecked'] = [card['badges']['checkItemsChecked'] for card in cards_raw]
        
        data = (
            data_cards
            .merge(
                lists_data[['id', 'name']].rename(columns={'id': 'idList', 'name': 'listName'}), 
                left_on='idList', 
                right_on='idList', 
                how='left'
            )
        )
        
        now_utc = datetime.now(timezone.utc)
        data['due'] = pd.to_datetime(data['due'], utc=True)
        data['time_to_due'] = data['due'] - now_utc
        data['time_to_due'] = data['time_to_due'].dt.days.fillna(0)
        return data
    
    def get_data(self, board_name: str, write_s3: bool = True):
        board_id = self.get_board_data(board_name)
        lists_data = self.get_lists(board_id)
        data = self.get_enriched_cards(board_id, lists_data)
        
        if self.s3_writer and write_s3:
            payload = data.to_csv(index=False)
            content_type = "text/csv"
            filename = f"trello_{board_name}_cards.csv"
            
            s3_path = self.s3_writer.write_file(
                filename=filename,
                content=payload,
                content_type=content_type
            )
            print(f"‚úÖ {s3_path}")
        
        # 3. Procesar y retornar datos
        data = data[self.DATAFRAME_COLUMNS].assign(
            due=lambda x: x['due'].astype(str).replace('NaT', '')
        )
        return data
    
    def get_data_json(self, board_name: str, task_status: list = None):
        if task_status is None:
            task_status = ['To Do', 'Doing']
        
        data = self.get_data(board_name, write_s3=False )
        data = data[data['listName'].isin(task_status)]
        json_str = json.dumps(data.to_json(orient='records'), ensure_ascii=False)
        return json_str





In [None]:
class aws_s3:
    def __init__(self, BUCKET, PROMPT_FILEPATH="", BASE_PATH='output'):
        self.BUCKET = BUCKET
        self.PROMPT_FILEPATH = PROMPT_FILEPATH
        self.BASE_PATH = BASE_PATH.rstrip("/")
        
    def read_file(self):
        s3 = boto3.client("s3")
        response = s3.get_object(Bucket=self.BUCKET, Key=self.PROMPT_FILEPATH)
        data = response["Body"].read().decode("utf-8")
        return data
    
    def write_file(self, content, content_type, filename, partition_date=True):
        today = datetime.today().strftime("%Y-%m-%d")
        s3 = boto3.client("s3")
        
        if partition_date: 
            s3_key = f"{self.BASE_PATH}/{today}/{filename}"
        else:
            s3_key = f"{self.BASE_PATH}/{filename}"
        
        if isinstance(content, str):
            content = content.encode("utf-8")
        
        s3.put_object(
            Bucket=self.BUCKET, 
            Key=s3_key,
            Body=content,
            ContentType=content_type
        )
        return f"file written in: s3://{self.BUCKET}/{s3_key}"  

    
    def write_pdf_from_markdown(self, markdown_text, filename, partition_date=True):
        # Generate pdf in memory
        buffer = BytesIO()
        pdf_generator = MarkdownPDFReport(output_path=buffer)
        pdf_generator.build(markdown_text)
        
        # get the content
        buffer.seek(0)
        pdf_content = buffer.getvalue()
        
        # save in pdf
        return self.write_file(
            content=pdf_content,
            content_type="application/pdf",
            filename=filename,
            partition_date=partition_date
        )




In [None]:
class aws_SecretManager():
    def __init__(self,SECRET_NAME,REGION_NAME):
        self.SECRET_NAME=SECRET_NAME
        self.REGION_NAME=REGION_NAME
    
    def get_secret_value(self):
        session = boto3.session.Session()
        client = session.client(
        service_name='secretsmanager',
        region_name=self.REGION_NAME
    )

        try:
            get_secret_value_response = client.get_secret_value(
                SecretId=self.SECRET_NAME
            )
            secret = ast.literal_eval(get_secret_value_response['SecretString'])
            return secret
        except ClientError as e:
            raise e





In [None]:
class AWSBedrock():
    def __init__(self,PROMPT:str, DATASET: str, REGION:str="us-east-1",MODEL_ID:str ="amazon.nova-lite-v1:0" ):
        self.prompt = PROMPT
        self.dataset = DATASET
        self.prompt_final = f"{self.prompt} {self.dataset}"
        self.region = REGION
        self.model_id =MODEL_ID
        
    def create_bedrock_client(self):
        bedrock = boto3.client(
            service_name='bedrock-runtime',
            region_name=self.region
        )
        return bedrock

    def get_payload(self):
        payload = {
        "messages": [
            {
                "role": "user",
                "content": [{"text": self.prompt_final}]
            }
        ],
        "inferenceConfig": {
            "max_new_tokens": 5000,
            "temperature": 0.4,
            "top_p": 0.9
        }
    }
        return payload

    def invoke_model(self):
        try:
            bedrock = self.create_bedrock_client()
            payload = self.get_payload()
            
            response = bedrock.invoke_model(
                modelId=self.model_id,
                body=json.dumps(payload)
            )
            
            response_body = json.loads(response['body'].read())
            data =response_body['output']['message']['content'][0]['text']
            return data
        except Exception as e:
             print(f"Error: {e}")


In [None]:
class MarkdownPDFReport:
    def __init__(self, output_path=None):
        self.output_path = output_path or BytesIO()
        self.styles = self._build_styles()
        self.story = []

    def build(self, md_text):
        html = self._markdown_to_html(md_text)
        soup = BeautifulSoup(html, "html.parser")
        self._parse_html(soup)
        self._build_pdf()
        
        # Si es BytesIO, obtener el contenido
        if isinstance(self.output_path, BytesIO):
            self.output_path.seek(0)
            return self.output_path.getvalue()
        
        return None

    def _markdown_to_html(self, md_text):
        html = markdown(md_text, extensions=["tables", "nl2br"])

        emoji_map = {
            "üü¢": '<font color="green"><b>[‚úì]</b></font>',
            "üü°": '<font color="orange"><b>[!]</b></font>',
            "üî¥": '<font color="red"><b>[‚úó]</b></font>',
        }
        for emoji, replacement in emoji_map.items():
            html = html.replace(emoji, replacement)

        html = html.replace("<strong>", "<b>").replace("</strong>", "</b>")
        html = html.replace("<em>", "<i>").replace("</em>", "</i>")
        return html

    def _build_styles(self):
        styles = getSampleStyleSheet()

        styles.add(ParagraphStyle(
            name="CustomHeading1",
            fontSize=24,
            spaceAfter=16,
            spaceBefore=12,
            textColor=colors.HexColor("#1a1a1a"),
            fontName="Helvetica-Bold"
        ))

        styles.add(ParagraphStyle(
            name="CustomHeading2",
            fontSize=18,
            spaceAfter=12,
            spaceBefore=10,
            textColor=colors.HexColor("#2c3e50"),
            fontName="Helvetica-Bold"
        ))

        styles.add(ParagraphStyle(
            name="CustomHeading3",
            fontSize=14,
            spaceAfter=8,
            spaceBefore=8,
            textColor=colors.HexColor("#34495e"),
            fontName="Helvetica-Bold"
        ))

        styles.add(ParagraphStyle(
            name="CustomHeading4",
            fontSize=12,
            spaceAfter=6,
            spaceBefore=6,
            textColor=colors.HexColor("#7f8c8d"),
            fontName="Helvetica-Bold"
        ))

        styles.add(ParagraphStyle(
            name="CustomNormal",
            fontSize=10,
            leading=14,
            textColor=colors.HexColor("#2c3e50"),
            fontName="Helvetica"
        ))

        return styles

    def _inner_html(self, element):
        return "".join(str(child) for child in element.children)

    def _parse_html(self, soup):
        for el in soup.children:
            if el.name == "h1":
                self.story.append(Paragraph(self._inner_html(el), self.styles["CustomHeading1"]))
                self.story.append(Spacer(1, 0.15 * inch))

            elif el.name == "h2":
                self.story.append(Paragraph(self._inner_html(el), self.styles["CustomHeading2"]))
                self.story.append(Spacer(1, 0.12 * inch))

            elif el.name == "h3":
                self.story.append(Paragraph(self._inner_html(el), self.styles["CustomHeading3"]))
                self.story.append(Spacer(1, 0.1 * inch))

            elif el.name == "h4":
                self.story.append(Paragraph(self._inner_html(el), self.styles["CustomHeading4"]))
                self.story.append(Spacer(1, 0.08 * inch))

            elif el.name == "p":
                content = self._inner_html(el).strip()
                if content:
                    self.story.append(Paragraph(content, self.styles["CustomNormal"]))
                    self.story.append(Spacer(1, 0.08 * inch))

            elif el.name in ["ul", "ol"]:
                items = []
                for li in el.find_all("li", recursive=False):
                    items.append(
                        ListItem(
                            Paragraph(self._inner_html(li), self.styles["CustomNormal"]),
                            leftIndent=20
                        )
                    )
                self.story.append(ListFlowable(items, bulletType="bullet"))
                self.story.append(Spacer(1, 0.1 * inch))

            elif el.name == "table":
                self._build_table(el)

            elif el.name == "hr":
                self.story.append(HRFlowable(width="100%", thickness=1,
                                             color=colors.HexColor("#bdc3c7")))
                self.story.append(Spacer(1, 0.1 * inch))

    def _build_table(self, table_el):
        table_data = []

        for row in table_el.find_all("tr"):
            cells = row.find_all(["th", "td"])
            table_data.append([
                Paragraph(self._inner_html(cell), self.styles["CustomNormal"])
                for cell in cells
            ])

        table = Table(table_data, hAlign="LEFT")
        table.setStyle(TableStyle([
            ("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
            ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#ea78d7")),
            ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
            ("FONTSIZE", (0, 0), (-1, 0), 11),
            ("ROWBACKGROUNDS", (0, 1), (-1, -1),
             [colors.white, colors.HexColor("#f8f9fa")]),
            ("LEFTPADDING", (0, 0), (-1, -1), 8),
            ("RIGHTPADDING", (0, 0), (-1, -1), 8),
            ("TOPPADDING", (0, 0), (-1, -1), 8),
            ("BOTTOMPADDING", (0, 0), (-1, -1), 8),
        ]))

        self.story.append(table)
        self.story.append(Spacer(1, 0.15 * inch))

    def _build_pdf(self):
        doc = SimpleDocTemplate(
            self.output_path,
            pagesize=A4,
            rightMargin=0.75 * inch,
            leftMargin=0.75 * inch,
            topMargin=0.75 * inch,
            bottomMargin=0.75 * inch
        )
        doc.build(self.story)


class aws_s3:
    def __init__(self, BUCKET, PROMPT_FILEPATH="", BASE_PATH='output'):
        self.BUCKET = BUCKET
        self.PROMPT_FILEPATH = PROMPT_FILEPATH
        self.BASE_PATH = BASE_PATH.strip("/")
        
    def read_file(self):
        s3 = boto3.client("s3")
        response = s3.get_object(Bucket=self.BUCKET, Key=self.PROMPT_FILEPATH)
        data = response["Body"].read().decode("utf-8")
        return data
    
    def write_file(self, content, content_type, filename, partition_date=True):
        today = datetime.today().strftime("%Y-%m-%d")
        s3 = boto3.client("s3")
        
        if partition_date: 
            s3_key = f"{self.BASE_PATH}/{today}/{filename}"
        else:
            s3_key = f"{self.BASE_PATH}/{filename}"
        
        if isinstance(content, str):
            content = content.encode("utf-8")
        
        s3.put_object(
            Bucket=self.BUCKET, 
            Key=s3_key,
            Body=content,
            ContentType=content_type
        )
        return s3_key
    
    def write_pdf_from_markdown(self, markdown_text, filename, partition_date=True):
        # Create pdf file in memory
        buffer = BytesIO()
        pdf_generator = MarkdownPDFReport(output_path=buffer)
        pdf_generator.build(markdown_text)
        
        # Get the content from pdf file
        buffer.seek(0)
        pdf_content = buffer.getvalue()
        
        # save file in s3
        return self.write_file(
            content=pdf_content,
            content_type="application/pdf",
            filename=filename,
            partition_date=partition_date
        )





In [None]:
class aws_SES:
    def __init__(self,
                   EMAIL_SENDER:str,
                   REGION_NAME:str,
                   EMAIL_RECIPIENTS:list,
                   BUCKET:str,
                   FILEPATH:str,
                   ATTACHMENT_NAME:str,
                   EMAIL_HTML_BODY:str,
                   EMAIL_SUBJECT_NAME:str="Project Status Report ‚Äì Trello Analysis"):
        self.BUCKET=BUCKET
        self.REGION_NAME=REGION_NAME
        self.EMAIL_SENDER = EMAIL_SENDER
        self.EMAIL_RECIPIENTS = EMAIL_RECIPIENTS
        self.FILEPATH=FILEPATH
        self.ATTACHMENT_NAME=ATTACHMENT_NAME
        self.EMAIL_SUBJECT_NAME= EMAIL_SUBJECT_NAME
        self.EMAIL_HTML_BODY=EMAIL_HTML_BODY
        
    def get_attachment_file(self):
        s3 = boto3.client("s3")
        pdf_obj = s3.get_object(Bucket=self.BUCKET, Key=self.FILEPATH)
        pdf_bytes = pdf_obj["Body"].read()
        return pdf_bytes
    
    def send_email(self):
        ses = boto3.client("ses", region_name=self.REGION_NAME)
        msg = MIMEMultipart("mixed")
        msg["Subject"] = self.EMAIL_SUBJECT_NAME
        msg["From"] = self.EMAIL_SENDER
        msg["To"] = ", ".join(self.EMAIL_RECIPIENTS)

        # HTML part
        body = MIMEMultipart("alternative")
        body.attach(MIMEText(self.EMAIL_HTML_BODY, "html", "utf-8"))
        msg.attach(body)

        # Attachment
        attachment = MIMEApplication(self.get_attachment_file())
        attachment.add_header(
            "Content-Disposition",
            "attachment",
            filename=self.ATTACHMENT_NAME
        )
        msg.attach(attachment)
        
        # -------- SEND EMAIL --------
        response = ses.send_raw_email(
            Source=self.EMAIL_SENDER,
            Destinations=self.EMAIL_RECIPIENTS,
            RawMessage={"Data": msg.as_string()}
        )
        return print("Email sent! MessageId:", response["MessageId"])




In [None]:
BUCKET = "trello-ai-tutorial"
PROMPT_FILEPATH = "prompt/aws_prompt.txt"
SECRET_NAME = "prod/trello-token"
REGION_NAME = "us-east-1"
BOARD_NAME = 'Kanban-ecommerce'
EMAIL_TEMPLATE_FILEPATH='email_template/email_daily_status_template.html'

# 1. Read prompt
print('1. Read prompt file')
s3 = aws_s3(BUCKET, PROMPT_FILEPATH)
prompt = s3.read_file()
print('‚úÖ Prompt file read successfully')

# 2. Get credentials
secret_token = aws_SecretManager(SECRET_NAME, REGION_NAME).get_secret_value()
API_KEY = secret_token['trello_project_api_key']
API_TOKEN = secret_token['trello_project_token']

# 3. Get Trello data
print('2. Get Trello data')
trello = Trello(
    BUCKET_NAME=BUCKET,  
    API_KEY=API_KEY,
    API_TOKEN=API_TOKEN,
    S3=s3  
)
data = trello.get_data(board_name=BOARD_NAME)
data_json = trello.get_data_json(BOARD_NAME, ['To Do', 'Doing'])
print('‚úÖ Trello data retrieved successfully')

print('3. Invoke AWS Bedrock model')
aws_bedrock = AWSBedrock(PROMPT=prompt,DATASET=data_json)
aws_bedrock_response = aws_bedrock.invoke_model()
print('‚úÖ AWS Bedrock model invoked successfully')

print('4. Save PDF report to S3')
s3_pdf_path = s3.write_pdf_from_markdown(
    markdown_text=aws_bedrock_response,
    filename=f"trello_report_{BOARD_NAME}.pdf"
)
print(f'‚úÖ {s3_pdf_path}')


print('5. Send Email')
s3 = aws_s3(BUCKET, EMAIL_TEMPLATE_FILEPATH)
html_body = s3.read_file()
ses=aws_SES(EMAIL_SENDER = "<email-sender>@<domain>.com",
        EMAIL_RECIPIENTS = ["<email-recipient>@<domain>.com"],
        REGION_NAME=REGION_NAME,
        BUCKET=BUCKET,
        FILEPATH=s3_pdf_path, 
        ATTACHMENT_NAME="Project_Status_Trello.pdf",
        EMAIL_HTML_BODY=html_body,
        EMAIL_SUBJECT_NAME="Project Status Report ‚Äì Trello Analysis v1")

ses.send_email()
print('‚úÖ Email was send successfully')

1. Read prompt file
‚úÖ Prompt file read successfully
2. Get Trello data
‚úÖ output/2025-12-23/trello_Kanban-ecommerce_cards.csv
‚úÖ Trello data retrieved successfully
3. Invoke AWS Bedrock model
‚úÖ AWS Bedrock model invoked successfully
4. Save PDF report to S3
‚úÖ output/2025-12-23/trello_report_Kanban-ecommerce.pdf
5. Send Email
Email sent! MessageId: 0100019b48a4c75e-4b8efb25-490a-478b-b161-390f2f197af4-000000
‚úÖ Email was send successfully


## Example Output: Email and Report Preview üìÑ‚úâÔ∏è
Below is an example of the report generated by the solution. The complete output consists of a six-page PDF, but for illustration purposes, the following screenshots show the cover page and a selection of summary tables used to highlight key project insights.

![gmail](img/trello-gmail.png)

<center>

![gmail](img/trello-pdf-1.png)


</center>