In [1]:
import os
import time
import json
import requests
import logging
import pickle
from dataclasses import dataclass
from typing import List, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# ---------------------- Core Data Structures ----------------------
@dataclass
class LinkedInPost:
    text: str
    hashtags: List[str]
    media_urn: Optional[str] = None
    link_preview: Optional[str] = None
    engagement_score: float = 0.0
    error: bool = False
    error_details: str = ""

# ---------------------- Ollama Integration Layer ----------------------
class DeepSeekClient:
    """Optimized client for DeepSeek-R1 1.5B model"""
    def __init__(self, base_url: str = "http://localhost:11434"):
        self.base_url = base_url
        self.model_name = "deepseek-r1:1.5b"
        self.timeout = 150
        self._verify_model()

    def _verify_model(self):
        """Ensure model is available and loaded"""
        try:
            models = requests.get(f"{self.base_url}/api/tags", timeout=10).json()
            if not any(self.model_name in m["name"] for m in models.get("models", [])):
                self._pull_model()
        except Exception as e:
            logger.error(f"Model verification failed: {e}")
            raise

    def _pull_model(self):
        """Handle model download with progress tracking"""
        try:
            response = requests.post(
                f"{self.base_url}/api/pull",
                json={"name": self.model_name},
                stream=True,
                timeout=600
            )
            for line in response.iter_lines():
                if line:
                    logger.info(f"Download: {json.loads(line).get('status', '')}")
        except Exception as e:
            logger.error(f"Model pull failed: {e}")
            raise

    def generate_post(self, topic: str, key_points: List[str]) -> LinkedInPost:
        """Generate optimized LinkedIn content with metadata extraction"""
        prompt = self._build_prompt(topic, key_points)
        
        try:
            response = requests.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": self.model_name,
                    "prompt": prompt,
                    "stream": False,
                    "options": {
                        "temperature": 0.7,
                        "top_p": 0.9,
                        "max_tokens": 500
                    }
                },
                timeout=self.timeout
            )
            return self._parse_response(response.json())
        except Exception as e:
            return LinkedInPost(
                text="",
                hashtags=[],
                error=True,
                error_details=str(e)
            )

    def _build_prompt(self, topic: str, points: List[str]) -> str:
        """Construct professional content generation prompt"""
        return f"""You are a LinkedIn content expert. Create a post about {topic} that:
- Targets tech professionals and executives
- Uses an inspirational yet professional tone
- Includes 3-5 industry-specific hashtags
- Incorporates these key points: {", ".join(points)}
- Limits to 3 short paragraphs maximum
- Adds emojis sparingly for visual appeal

Output format (JSON):
{{
    "content": "post text",
    "hashtags": ["#list", "#of", "#tags"],
    "engagement_strategies": ["list of strategies used"]
}}"""

    def _parse_response(self, data: dict) -> LinkedInPost:
        """Parse and validate model output"""
        try:
            response = json.loads(data["response"])
            return LinkedInPost(
                text=response["content"],
                hashtags=response["hashtags"],
                engagement_score=self._calculate_score(response)
            )
        except Exception as e:
            return LinkedInPost(
                text="",
                hashtags=[],
                error=True,
                error_details=f"Parsing error: {str(e)}"
            )

    def _calculate_score(self, response: dict) -> float:
        """Calculate quality score based on content features"""
        strategies = len(response["engagement_strategies"])
        hashtags = len(response["hashtags"])
        content_len = len(response["content"])
        return min((strategies * 0.2) + (hashtags * 0.15) + (content_len/500 * 0.65), 1.0)

# ---------------------- LinkedIn API Integration ----------------------
class LinkedInManager:
    """Handles full LinkedIn integration lifecycle"""
    def __init__(self, client_id: str, client_secret: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.token_expiry = 0
        self.headers = {
            "X-Restli-Protocol-Version": "2.0.0",
            "Content-Type": "application/json"
        }

    def authenticate(self):
        """Handle OAuth 2.0 authentication flow"""
        auth_url = f"https://www.linkedin.com/oauth/v2/authorization?response_type=code&client_id={self.client_id}&redirect_uri=http://localhost:8000/callback&scope=w_member_social"
        logger.info(f"Authenticate via: {auth_url}")
        # In production: Implement full OAuth flow with callback handler
        
        # For demo purposes using env token
        self.access_token = os.getenv("LINKEDIN_TOKEN")
        self.headers["Authorization"] = f"Bearer {self.access_token}"

    def create_post(self, post: LinkedInPost):
        """Execute UGC Post creation with error handling"""
        if post.error:
            raise ValueError("Cannot post content with errors")

        post_data = {
            "author": "urn:li:person:{os.getenv('LINKEDIN_USER_URN')}",
            "lifecycleState": "PUBLISHED",
            "specificContent": {
                "com.linkedin.ugc.ShareContent": {
                    "shareCommentary": {
                        "text": f"{post.text}\n\n{' '.join(post.hashtags)}"
                    },
                    "shareMediaCategory": "NONE"
                }
            },
            "visibility": {
                "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
            }
        }

        if post.media_urn:
            post_data["specificContent"]["com.linkedin.ugc.ShareContent"]["media"] = [
                {"status": "READY", "media": post.media_urn}
            ]
            post_data["specificContent"]["com.linkedin.ugc.ShareContent"]["shareMediaCategory"] = "IMAGE"

        try:
            response = requests.post(
                "https://api.linkedin.com/v2/ugcPosts",
                headers=self.headers,
                json=post_data
            )
            response.raise_for_status()
            return response.headers.get("X-Restli-Id")
        except Exception as e:
            logger.error(f"Post failed: {str(e)}")
            return None

    def upload_media(self, file_path: str) -> Optional[str]:
        """Handle media upload workflow"""
        try:
            # Register upload
            register_payload = {
                "registerUploadRequest": {
                    "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
                    "owner": f"urn:li:person:{os.getenv('LINKEDIN_USER_URN')}",
                    "serviceRelationships": [{
                        "relationshipType": "OWNER",
                        "identifier": "urn:li:userGeneratedContent"
                    }]
                }
            }
            
            register_response = requests.post(
                "https://api.linkedin.com/v2/assets?action=registerUpload",
                headers=self.headers,
                json=register_payload
            )
            register_response.raise_for_status()
            
            upload_url = register_response.json()["value"]["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"]
            asset_urn = register_response.json()["value"]["asset"]

            # Perform actual upload
            with open(file_path, "rb") as f:
                upload_response = requests.put(upload_url, data=f)
                upload_response.raise_for_status()

            return asset_urn
        except Exception as e:
            logger.error(f"Media upload failed: {str(e)}")
            return None

# ---------------------- Pipeline Orchestrator ----------------------
class ContentPipeline:
    """End-to-end content generation and posting system"""
    def __init__(self):
        self.llm = DeepSeekClient()
        self.linkedin = LinkedInManager(
            client_id=os.getenv("LINKEDIN_CLIENT_ID"),
            client_secret=os.getenv("LINKEDIN_CLIENT_SECRET")
        )
        self.linkedin.authenticate()

    def execute(self, topic: str, key_points: List[str], image_path: Optional[str] = None):
        """Full pipeline execution flow"""
        try:
            # Generate content
            post = self.llm.generate_post(topic, key_points)
            if post.error:
                raise RuntimeError(f"Generation failed: {post.error_details}")

            # Handle media
            if image_path:
                media_urn = self.linkedin.upload_media(image_path)
                if media_urn:
                    post.media_urn = media_urn
                else:
                    logger.warning("Proceeding without media due to upload failure")

            # Post to LinkedIn
            post_id = self.linkedin.create_post(post)
            if post_id:
                logger.info(f"Successfully posted content with ID: {post_id}")
                self._save_artifacts(post, post_id)
                return post_id
            return None
            
        except Exception as e:
            logger.error(f"Pipeline failed: {str(e)}")
            return None

    def _save_artifacts(self, post: LinkedInPost, post_id: str):
        """Persist pipeline artifacts for analysis"""
        artifact = {
            "timestamp": time.time(),
            "post_id": post_id,
            "content": post.text,
            "metadata": {
                "hashtags": post.hashtags,
                "engagement_score": post.engagement_score,
                "media_used": bool(post.media_urn)
            }
        }
        
        with open(f"post_{post_id}.pkl", "wb") as f:
            pickle.dump(artifact, f)

# ---------------------- Example Usage ----------------------
if __name__ == "__main__":
    # Environment setup
    os.environ["LINKEDIN_CLIENT_ID"] = "your_client_id"
    os.environ["LINKEDIN_CLIENT_SECRET"] = "your_client_secret"
    os.environ["LINKEDIN_USER_URN"] = "your_user_urn"
    os.environ["LINKEDIN_TOKEN"] = "your_access_token"

    # Execute pipeline
    pipeline = ContentPipeline()
    post_id = pipeline.execute(
        topic="AI-Powered Content Generation Pipelines",
        key_points=[
            "Hybrid human-AI workflow integration",
            "Quality control through RAG architecture",
            "Ethical considerations in automated content"
        ],
        image_path="ai_pipeline_diagram.png"  # Optional
    )
    
    if post_id:
        logger.info(f"Successfully created post: {post_id}")
    else:
        logger.error("Post creation failed")

2025-04-01 18:18:24,924 - INFO - Authenticate via: https://www.linkedin.com/oauth/v2/authorization?response_type=code&client_id=your_client_id&redirect_uri=http://localhost:8000/callback&scope=w_member_social
2025-04-01 18:18:35,609 - ERROR - Pipeline failed: Generation failed: Parsing error: Expecting value: line 1 column 1 (char 0)
2025-04-01 18:18:35,621 - ERROR - Post creation failed


In [None]:
# ---------------------- RAG Components ----------------------
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from langchain.vectorstores import FAISS
from typing import List, Tuple

class CSVProcessor:
    """Handles CSV-based knowledge base for RAG"""
    def __init__(self, csv_path: str, embedding_model: str = 'all-MiniLM-L6-v2'):
        self.df = pd.read_csv(csv_path)
        self.encoder = SentenceTransformer(embedding_model)
        self._preprocess_data()
        self._create_vector_store()
        
    def _preprocess_data(self):
        """Clean and prepare CSV data"""
        self.df['combined_text'] = self.df.apply(
            lambda row: f"Title: {row.get('title', '')}\nContent: {row.get('content', '')}",
            axis=1
        )
        # Add any additional preprocessing here
        
    def _create_vector_store(self):
        """Generate embeddings and create FAISS index"""
        texts = self.df['combined_text'].tolist()
        embeddings = self.encoder.encode(texts, show_progress_bar=True)
        self.vector_store = FAISS.from_embeddings(
            text_embeddings=list(zip(texts, embeddings)),
            embedding=self.encoder
        )
    
    def retrieve_context(self, query: str, k: int = 3) -> List[str]:
        """Retrieve relevant context from CSV data"""
        results = self.vector_store.similarity_search(query, k=k)
        return [doc.page_content for doc in results]

In [None]:
# ---------------------- Enhanced Generation ----------------------
class DeepSeekRAGClient(DeepSeekClient):
    """RAG-enhanced content generator"""
    def __init__(self, csv_processor: CSVProcessor, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.csv_processor = csv_processor
        
    def generate_post(self, topic: str, key_points: List[str]) -> LinkedInPost:
        """Generate post with RAG context"""
        # Retrieve relevant context
        context = self.csv_processor.retrieve_context(topic)
        
        # Generate with context-enhanced prompt
        prompt = self._build_rag_prompt(topic, key_points, context)
        
        try:
            response = requests.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": self.model_name,
                    "prompt": prompt,
                    "stream": False,
                    "options": {"temperature": 0.7, "top_p": 0.9, "max_tokens": 600}
                },
                timeout=self.timeout
            )
            return self._parse_response(response.json())
        except Exception as e:
            return LinkedInPost(
                text="", hashtags=[], error=True, error_details=str(e)
    
    def _build_rag_prompt(self, topic: str, points: List[str], context: List[str]) -> str:
        """Create context-aware prompt"""
        context_str = "\n".join([f"- {c}" for c in context])
        return f"""Using the following context:
{context_str}

Create a LinkedIn post about {topic} that:
- Synthesizes key insights from the context
- Incorporates these points: {", ".join(points)}
- Maintains professional tone with 3-5 hashtags
- Includes data-driven insights where possible
- Limits to 300-400 characters

Format (JSON):
{{
    "content": "post text",
    "hashtags": ["#list"],
    "data_points_used": ["list of used statistics/references"]
}}"""

In [None]:
# ---------------------- Integrated Pipeline ----------------------
class RAGContentPipeline(ContentPipeline):
    """End-to-end pipeline with RAG integration"""
    def __init__(self, csv_path: str):
        super().__init__()
        self.csv_processor = CSVProcessor(csv_path)
        self.llm = DeepSeekRAGClient(self.csv_processor)
        
    def execute_with_rag(self, topic: str, key_points: List[str], image_path: Optional[str] = None):
        """Execute full RAG-enhanced pipeline"""
        return super().execute(topic, key_points, image_path)

# Example Execution
if __name__ == "__main__":
    # Initialize with your CSV knowledge base
    pipeline = RAGContentPipeline("industry_reports.csv")
    
    # Execute RAG-enhanced post creation
    post_id = pipeline.execute_with_rag(
        topic="AI in Manufacturing",
        key_points=[
            "Predictive maintenance adoption",
            "Quality control improvements",
            "Supply chain optimization"
        ],
        image_path="factory_ai.png"
    )
    
    if post_id:
        logger.info(f"RAG-enhanced post created: {post_id}")
    else:
        logger.error("RAG pipeline failed")