In [1]:
from __future__ import annotations

import base64
import datetime as dt
import io
import json
import logging
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import requests
from pydantic import BaseModel, Field, HttpUrl, ValidationError
from zoneinfo import ZoneInfo
from dotenv import load_dotenv

# LangChain / LangGraph
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

In [2]:
# ---------------------------
# Config & Utilities
# ---------------------------

load_dotenv()

LOG = logging.getLogger("festival_linkedin_bot")
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)

INDIA_TZ = ZoneInfo("Asia/Kolkata")

DEFAULT_OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
DEFAULT_IMAGE_MODEL = os.getenv("IMAGE_MODEL", "gpt-image-1")
DOWNLOAD_DIR = Path(os.getenv("DOWNLOAD_DIR", "./downloads")).resolve()
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)

In [3]:
# ---------------------------
# Festival Provider (Calendar-based)
# ---------------------------
from ics import Calendar
import requests

def fetch_festivals_from_calendar(url: str) -> List[FestivalInfo]:
    """
    Fetch events from an ICS calendar (public URL or local file) and convert to FestivalInfo list.
    """
    if url.startswith("http"):
        r = requests.get(url)
        r.raise_for_status()
        data = r.text
    else:
        with open(url, "r", encoding="utf-8") as f:
            data = f.read()

    cal = Calendar(data)
    festivals: List[FestivalInfo] = []
    for event in cal.events:
        if not event.begin:
            continue
        date = event.begin.date()
        festivals.append(FestivalInfo(
            name=event.name,
            date=date,
            region=None,
            emoji=None,
            colors=None,
        ))
    return festivals

def get_upcoming_festival(today: dt.date, calendar_url: str) -> Optional[FestivalInfo]:
    festivals = fetch_festivals_from_calendar(calendar_url)
    upcoming = [f for f in festivals if f.date >= today]
    if not upcoming:
        return None
    upcoming.sort(key=lambda f: f.date)
    return upcoming[1]

In [4]:
# ---------------------------
# Data Models
# ---------------------------

class FestivalInfo(BaseModel):
    name: str
    date: dt.date
    region: Optional[str] = None
    emoji: Optional[str] = None
    colors: Optional[List[str]] = None

class PostDraft(BaseModel):
    festival: FestivalInfo
    title: str
    body: str
    hashtags: List[str]

class BannerSpec(BaseModel):
    festival: FestivalInfo
    prompt: str
    width: int = 1600
    height: int = 900

class LinkedInPostResult(BaseModel):
    post_urn: Optional[str]
    asset_urn: Optional[str]
    share_url: Optional[str]

class AppConfig(BaseModel):
    brand_name: Optional[str] = Field(default=None, description="Brand name to include")
    brand_tone: Optional[str] = Field(default="warm, celebratory, professional")
    hashtags: List[str] = Field(default_factory=lambda: ["#Festival", "#Celebration", "#Community"]) 
    linkedin_author_urn: str = Field(..., description="urn:li:person:XXXX or urn:li:organization:XXXX")
    openai_model: str = Field(default=DEFAULT_OPENAI_MODEL)
    image_model: str = Field(default=DEFAULT_IMAGE_MODEL)

In [5]:
# ---------------------------
# LinkedIn API Client
# ---------------------------

class LinkedInClient:
    def __init__(self, access_token: str):
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {access_token}",
            "X-Restli-Protocol-Version": "2.0.0",
            "Content-Type": "application/json",
        })
        self.api_base = "https://api.linkedin.com/v2"

    def register_image_upload(self, owner_urn: str) -> Tuple[str, str]:
        url = f"{self.api_base}/assets?action=registerUpload"
        payload = {
            "registerUploadRequest": {
                "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
                "owner": owner_urn,
                "serviceRelationships": [
                    {"relationshipType": "OWNER", "identifier": "urn:li:userGeneratedContent"}
                ],
            }
        }
        r = self.session.post(url, json=payload)
        if r.status_code >= 300:
            raise RuntimeError(f"LinkedIn registerUpload failed: {r.status_code} {r.text}")
        data = r.json()
        asset_urn = data.get("value", {}).get("asset")
        upload_url = data.get("value", {}).get("uploadMechanism", {}) \
            .get("com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest", {}) \
            .get("uploadUrl")
        if not asset_urn or not upload_url:
            raise RuntimeError(f"Invalid registerUpload response: {data}")
        return asset_urn, upload_url

    def upload_binary(self, upload_url: str, binary: bytes, content_type: str = "image/png") -> None:
        headers = {"Content-Type": content_type}
        r = requests.put(upload_url, data=binary, headers=headers)
        if r.status_code >= 300:
            raise RuntimeError(f"LinkedIn binary upload failed: {r.status_code} {r.text}")

    def create_post_with_image(self, owner_urn: str, asset_urn: str, text: str) -> LinkedInPostResult:
        url = f"{self.api_base}/ugcPosts"
        payload = {
            "author": owner_urn,
            "lifecycleState": "PUBLISHED",
            "specificContent": {
                "com.linkedin.ugc.ShareContent": {
                    "shareCommentary": {"text": text},
                    "shareMediaCategory": "IMAGE",
                    "media": [
                        {
                            "status": "READY",
                            "description": {"text": ""},
                            "media": asset_urn,
                            "title": {"text": ""},
                        }
                    ],
                }
            },
            "visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"},
        }
        r = self.session.post(url, json=payload)
        if r.status_code >= 300:
            raise RuntimeError(f"LinkedIn post failed: {r.status_code} {r.text}")
        post_urn = r.headers.get("x-restli-id")
        return LinkedInPostResult(post_urn=post_urn, asset_urn=asset_urn, share_url=None)


In [None]:
# ---------------------------
# LangChain LLM helpers
# ---------------------------

def make_llm(model: str) -> ChatOpenAI:
    return ChatOpenAI(model=model, temperature=0.7)

POST_PROMPT = ChatPromptTemplate.from_messages([
    ("system",
     """You are a professional LinkedIn copywriter specialising in culturally aware, original content for an Indian audience.
Craft posts that are:
- Ethically responsible, plagiarism-free, and professionally written.
- Positive, inclusive, and free from stereotypes or clichés.
- Crisp and engaging (80–160 words), using British/Indian English.
- Structured with a strong opening hook, one practical insight, and a soft call-to-action.
- Tastefully enhanced with 0–3 emojis and 3–6 relevant hashtags.

Always reflect the brand’s tone and values. Prioritise authenticity, cultural sensitivity, and relevance to working professionals across India."""),
    
    ("human",
     """Write a LinkedIn post for the upcoming festival below.
Festival: {festival_name}
Date: {festival_date}
Emoji: {festival_emoji}
Brand: {brand_name}
Tone: {brand_tone}
Suggested hashtags (comma-separated): {hashtags}

Rules:
- 80–160 words.
- Start with a crisp 1-liner hook about the festival.
- Include one practical insight on how teams/customers can celebrate or reflect.
- End with a soft CTA (e.g., share how you celebrate, follow, or visit website).
Ensure the content is original, culturally respectful, and aligned with professional values."""),
])


BANNER_PROMPT_TMPL = (
    "Design a clean, modern LinkedIn banner celebrating {festival_name} for an Indian audience. "
    "Reflect the spirit of {festival_name} with warmth, elegance, and cultural sensitivity—avoid clichés or stereotypes. "
    "Use brand identity: {brand_name}, with tone: {brand_tone}. Prioritise a minimal, typography-led layout with tasteful festive motifs "
    "(e.g., lamps, rangoli, diyas, florals—depending on the festival). "
    "If colour hints are provided, lean into palette: {palette}. "
    "Include space for a headline: 'Happy {festival_name}!' and optionally a subtle brand logo placement. "
    "Ensure the design aligns with ethical creative principles—respect cultural symbols, avoid religious overtones unless explicitly requested, "
    "and maintain visual harmony for professional audiences."
)

In [7]:
# ---------------------------
# Image Generation (OpenAI Images API)
# ---------------------------

def generate_image_bytes(prompt: str, width: int, height: int, image_model: str) -> bytes:
    """Calls OpenAI Image API and returns raw PNG bytes."""
    try:
        from openai import OpenAI
    except Exception as e:
        raise RuntimeError("openai Python package is required. Install with `pip install openai`. ") from e

    client = OpenAI()
    LOG.info("Generating image via %s ...", image_model)
    img = client.images.generate(
        model="gpt-image-1",
        prompt=prompt,
        size="1536x1024",
        n=1
    )
    
    b64 = img.data[0].b64_json
    return base64.b64decode(b64)


def save_png(png_bytes: bytes, path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "wb") as f:
        f.write(png_bytes)

In [8]:
# ---------------------------
# LangGraph State
# ---------------------------

class GraphState(TypedDict):
    config: AppConfig
    festival: Optional[FestivalInfo]
    post: Optional[PostDraft]
    banner: Optional[BannerSpec]
    banner_path: Optional[str]
    linkedin_result: Optional[LinkedInPostResult]

In [9]:
# ---------------------------
# Graph Nodes
# ---------------------------

def node_select_festival(state: GraphState) -> GraphState:
    LOG.info("Selecting upcoming festival from client calendar ...")
    today = dt.datetime.now(tz=INDIA_TZ).date()

    calendar_url = os.getenv("CALENDAR_URL")
    if not calendar_url:
        raise RuntimeError("CALENDAR_URL env var required (ICS file or feed URL).")

    fest = get_upcoming_festival(today, calendar_url)
    if not fest:
        raise RuntimeError("No upcoming festivals found in client calendar.")

    state["festival"] = fest
    return state



def node_write_post(state: GraphState) -> GraphState:
    LOG.info("Generating LinkedIn post copy via LLM ...")
    cfg = state["config"]
    fest = state["festival"]
    assert fest is not None

    hashtags_input = os.getenv("HASHTAGS")
    hashtags = (
        [h.strip() for h in hashtags_input.split(",") if h.strip()]
        if hashtags_input else cfg.hashtags
    )

    llm = make_llm(cfg.openai_model)
    prompt = POST_PROMPT.format_messages(
        festival_name=f"{fest.emoji + ' ' if fest.emoji else ''}{fest.name}",
        festival_date=fest.date.isoformat(),
        festival_emoji=fest.emoji or "",
        brand_name=os.getenv("BRAND_NAME", cfg.brand_name or ""),
        brand_tone=os.getenv("BRAND_TONE", cfg.brand_tone or "warm, celebratory, professional"),
        hashtags=", ".join(hashtags),
    )
    msg = llm.invoke(prompt)
    body = msg.content.strip()

    title = f"{fest.emoji + ' ' if fest.emoji else ''}{fest.name}: Celebrating Together"
    state["post"] = PostDraft(festival=fest, title=title, body=body, hashtags=hashtags)
    return state


def node_make_banner(state: GraphState) -> GraphState:
    LOG.info("Generating banner image ...")
    cfg = state["config"]
    fest = state["festival"]
    assert fest is not None

    palette = ", ".join(fest.colors or ["brand palette"])
    prompt = BANNER_PROMPT_TMPL.format(
        festival_name=fest.name,
        brand_tone=cfg.brand_tone,
        palette=palette,
        brand_name=cfg.brand_name
    )

    banner = BannerSpec(festival=fest, prompt=prompt)
    png = generate_image_bytes(prompt=banner.prompt, width=banner.width, height=banner.height, image_model=cfg.image_model)

    filename = f"banner_{fest.date.isoformat()}_{fest.name.lower().replace(' ', '_')}.png"
    out_path = DOWNLOAD_DIR / filename
    save_png(png, out_path)

    state["banner"] = banner
    state["banner_path"] = str(out_path)
    return state


def node_post_linkedin(state: GraphState) -> GraphState:
    LOG.info("Posting to LinkedIn ...")
    cfg = state["config"]
    post = state["post"]
    banner_path = state["banner_path"]
    assert post is not None and banner_path is not None

    access_token = os.getenv("LINKEDIN_ACCESS_TOKEN")
    if not access_token:
        raise RuntimeError("LINKEDIN_ACCESS_TOKEN not set.")

    client = LinkedInClient(access_token)
    asset_urn, upload_url = client.register_image_upload(cfg.linkedin_author_urn)
    print(upload_url)
    with open(banner_path, "rb") as f:
        binary = f.read()
    client.upload_binary(upload_url, binary, content_type="image/png")

    # Compose text: title + body (LinkedIn ignores titles for UGC; keep in text)
    text = f"{post.title}\n\n{post.body}"
    result = client.create_post_with_image(cfg.linkedin_author_urn, asset_urn, text)

    state["linkedin_result"] = result
    return state

In [10]:
# ---------------------------
# Build Graph
# ---------------------------

def build_graph() -> StateGraph:
    graph = StateGraph(GraphState)
    graph.add_node("select_festival", node_select_festival)
    graph.add_node("write_post", node_write_post)
    graph.add_node("make_banner", node_make_banner)
    graph.add_node("post_linkedin", node_post_linkedin)

    graph.set_entry_point("select_festival")
    graph.add_edge("select_festival", "write_post")
    graph.add_edge("write_post", "make_banner")
    graph.add_edge("make_banner", "post_linkedin")
    graph.add_edge("post_linkedin", END)
    return graph

In [11]:
# ---------------------------
# CLI Entrypoint
# ---------------------------

def load_config_from_env() -> AppConfig:
    hashtags_input = os.getenv("HASHTAGS")
    hashtags = [h.strip() for h in hashtags_input.split(",") if h.strip()] if hashtags_input else None

    cfg = AppConfig(
        brand_name=os.getenv("BRAND_NAME"),
        brand_tone=os.getenv("BRAND_TONE", "warm, celebratory, professional"),
        hashtags=hashtags or ["#Festival", "#Celebration", "#Community"],
        linkedin_author_urn=os.getenv("LINKEDIN_AUTHOR_URN", "").strip() or None,
        openai_model=os.getenv("OPENAI_MODEL", DEFAULT_OPENAI_MODEL),
        image_model=os.getenv("IMAGE_MODEL", DEFAULT_IMAGE_MODEL),
    )
    if not cfg.linkedin_author_urn:
        raise RuntimeError("LINKEDIN_AUTHOR_URN is required. Example: urn:li:person:XXXX or urn:li:organization:XXXX")
    return cfg

In [None]:
def main() -> None:
    try:
        cfg = load_config_from_env()
        initial_state: GraphState = {
            "config": cfg,
            "festival": None,
            "post": None,
            "banner": None,
            "banner_path": None,
            "linkedin_result": None,
        }
        app = build_graph().compile()
        final_state = app.invoke(initial_state)

        fest = final_state["festival"]
        post = final_state["post"]
        result = final_state["linkedin_result"]

        LOG.info("Done. Festival: %s | Banner: %s | Post URN: %s", 
                 fest.name if fest else None, final_state.get("banner_path"), result.post_urn if result else None)
        print("\n=== Summary ===")
        print(f"Festival: {fest.name} on {fest.date.isoformat()}" if fest else "Festival: n/a")
        print(f"Banner saved to: {final_state.get('banner_path')}")
        print(f"LinkedIn Post URN: {result.post_urn if result else 'n/a'}")
    except ValidationError as ve:
        LOG.error("Validation error: %s", ve)
        sys.exit(2)
    except Exception as e:
        LOG.exception("Fatal error: %s", e)
        sys.exit(1)


if __name__ == "__main__":
    main()