In [4]:
import json
from confluent_kafka import Consumer, KafkaException
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import os
from datetime import datetime


from dotenv import load_dotenv
import json
import os

load_dotenv()

with open('config.json') as f:
    config = json.load(f)

# Access values
slack_token = os.getenv('SLACK_BOT_TOKEN')
channel_id = config['slack']['channel_id']



In [8]:
kafka_config = {
            'bootstrap.servers': os.getenv('REDPANDA_BROKERS', 'localhost:9092'),
            'group.id': 'employee-activities-slack-notifier',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False
        }

In [11]:
os.getenv('SLACK_BOT_TOKEN')

'xoxb-9326878182453-9371785047345-rXAZ7sz8sGKg3uKLDsDU4iN9'

In [1]:
# slack_notifier_fp.py
import json
import os
import time
from datetime import timedelta
from confluent_kafka import Consumer
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from dotenv import load_dotenv

# --- Config Loaders ---
def load_env_config():
    """Load environment variables"""
    load_dotenv()
    return {
        "kafka_brokers": os.getenv("REDPANDA_BROKERS"),
        "slack_token": os.getenv("SLACK_BOT_TOKEN"),
        "slack_channel_id": os.getenv("SLACK_CHANNEL_ID")
    }

def load_json_config(path="config.json"):
    """Load JSON configuration"""
    with open(path, encoding='utf-8') as f:
        return json.load(f)

# --- Data Transformers ---
def format_duration(seconds):
    """Convert seconds to human-readable duration (French)"""
    delta = timedelta(seconds=seconds)
    parts = []
    
    if delta.days > 0:
        parts.append(f"{delta.days} jour{'s' if delta.days > 1 else ''}")
    if delta.seconds // 3600 > 0:
        parts.append(f"{delta.seconds // 3600} heure{'s' if delta.seconds // 3600 > 1 else ''}")
    if (delta.seconds % 3600) // 60 > 0:
        parts.append(f"{(delta.seconds % 3600) // 60} minute{'s' if (delta.seconds % 3600) // 60 > 1 else ''}")
    
    if not parts:
        return "quelques secondes"
    return ' et '.join(parts)

def get_sport_name(sport_type):
    """Get French sport name from type"""
    sport_names = {
        "V": "vélo",
        "R": "course à pied",
        "S": "natation",
        "M": "randonnée",
        "E": "musculation",
        "T": "tennis",
        "N": "yoga"
    }
    return sport_names.get(sport_type, "activité sportive")

def format_distance(distance_m):
    """Format distance with 3 significant figures, using appropriate unit"""
    # First ensure distance_m is a number
    try:
        distance_num = float(distance_m)
    except (ValueError, TypeError):
        return ""  # Return empty string if conversion fails
    
    if distance_num >= 1000:  # 1 km or more
        distance_km = distance_num / 1000
        return f"{distance_km:.3g} km"
    return f"{distance_num:.3g} m"

def format_slack_message(payload, json_config):
    """Transform Kafka payload -> French motivational message"""
    # Only process inserts (op='c') and skip tombstones
    if payload.get("op") != 'c' or not payload.get("after"):
        return None
    
    activity = payload["after"]
    if not activity:
        return None

    # Get emoji and sport name
    sport_emoji = json_config["sport_emojis"].get(activity["Sport_type"], "🏃")
    sport_name = get_sport_name(activity["Sport_type"])
    
    # Calculate duration in seconds
    duration_seconds = (activity["Date_de_fin"] - activity["Date_de_debut"]) / 1_000_000
    duration_text = format_duration(duration_seconds)
    
    # Start building message parts
    message_parts = []
    
    # Add distance if relevant (exists and > 0)
    if activity.get("Distance_m") and activity["Distance_m"] > 0:
        message_parts.append(f"sur {format_distance(activity['Distance_m'])}")
    
    # Add duration
    message_parts.append(f"en {duration_text}")
    
    # Get comment if exists
    comment = activity.get("Commentaire")
    
    # Create the final message
    base_message = f"{sport_emoji} Bravo {activity['ID_salarie']} ! Tu viens de faire une session de {sport_name} "
    base_message += ' '.join(message_parts) + " !"
    
    if comment:
        # Add comment in parentheses on a new line
        base_message += f"\n\n(Merci pour ton commentaire : \"{comment}\")"
    
    return {
        "channel": json_config["slack"]["channel_id"],
        "text": base_message
    }

# --- Side-Effect Handlers ---
def send_slack_message(client, message):
    """Handle Slack API call"""
    try:
        response = client.chat_postMessage(**message)
        return {"success": True, "response": response}
    except SlackApiError as e:
        return {"success": False, "error": e.response["error"]}

# --- Main Pipeline ---
def run_consumer(kafka_config, slack_client, json_config):
    """Kafka consumer loop with initial buffer for old messages"""
    consumer = Consumer({
        "bootstrap.servers": kafka_config["kafka_brokers"],
        "group.id": "slack-notifier-fp",
        "auto.offset.reset": "earliest"
    })
    
    consumer.subscribe([json_config["kafka_topic"]])
    
    # State tracking
    initial_buffer = True
    buffered_messages = []
    max_buffer_size = 5
    delay = 3  # seconds between buffered messages
    
    try:
        while True:
            msg = consumer.poll(1.0)
            
            if msg is None:
                if initial_buffer and buffered_messages:
                    # When we've processed all initial messages, send the last 5
                    skipped_count = len(buffered_messages) - max_buffer_size

                    send_slack_message(slack_client, {
                        "channel": json_config["slack"]["channel_id"],
                        "text": "Bip Boop, lancement du système !"
                    })
                    
                    if skipped_count > 0:
                        send_slack_message(slack_client, {
                            "channel": json_config["slack"]["channel_id"],
                            "text": f"{skipped_count} messages ont été omis pour ne pas surcharger le canal. Voici les {max_buffer_size} derniers exploits :"
                        })
                    else:
                        send_slack_message(slack_client, {
                            "channel": json_config["slack"]["channel_id"],
                            "text": "Vite ! Voici les exploits accomplis pendant mon absence :"
                        })
                    
                    # Send the last few messages with small delay
                    for message in buffered_messages[-max_buffer_size:]:
                        send_slack_message(slack_client, message)
                        time.sleep(delay)
                    
                    initial_buffer = False
                continue
                
            if msg.error():
                print(f"Consumer error: {msg.error()}")
                continue
            
            try:
                message_value = msg.value()
                if not message_value:
                    continue
                    
                payload = json.loads(message_value.decode('utf-8')).get("payload")
                if not payload:
                    continue
                    
                if message := format_slack_message(payload, json_config):
                    if initial_buffer:
                        buffered_messages.append(message)
                    else:
                        send_slack_message(slack_client, message)
                        
            except json.JSONDecodeError as e:
                print(f"Error decoding message: {e}")
            except Exception as e:
                print(f"Error processing message: {e}")
            
            consumer.commit(msg)
            
    except KeyboardInterrupt:
        print("Stopping consumer...")
    finally:
        consumer.close()

if __name__ == "__main__":
    # Load configurations
    print("> Loading configurations...")
    env_config = load_env_config()
    json_config = load_json_config()
    
    # Initialize Slack client
    print("> Connecting to Slack...")
    slack_client = WebClient(token=env_config["slack_token"])
    
    # Verify Slack connection
    try:
        auth_test = slack_client.auth_test()
        print(f"> Successfully connected to Slack bot: {auth_test['user']} (Team: {auth_test['team']})")
    except SlackApiError as e:
        print(f"> Failed to connect to Slack: {e.response['error']}")
        exit(1)
    
    # Initialize Kafka consumer
    print(f"> Setting up Redpanda consumer for topic: {json_config['kafka_topic']}")
    print(f"> Broker(s): {env_config['kafka_brokers']}")
    
    # Start consumer
    print("> Starting consumer loop...")
    print("──────────────────────────────────────────")
    run_consumer(env_config, slack_client, json_config)


> Loading configurations...
> Connecting to Slack...
> Successfully connected to Slack bot: sporty_bot (Team: employee-activity-etl-poc)
> Setting up Redpanda consumer for topic: pg_cdc.public.employee_activities
> Broker(s): localhost:9092
> Starting consumer loop...
──────────────────────────────────────────
Stopping consumer...


In [18]:
# test_slack_auth.py
import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from dotenv import load_dotenv

def test_slack_auth():
    # Load environment variables
    load_dotenv()
    slack_token = os.getenv("SLACK_BOT_TOKEN")
    
    if not slack_token:
        print("❌ Error: SLACK_TOKEN not found in .env file")
        return False
    
    # Initialize client
    client = WebClient(token=slack_token)
    
    try:
        # Test authentication with a simple API call
        response = client.auth_test()
        print(f"✅ Successfully authenticated to Slack as {response['user']} (Team: {response['team']})")
        return True
    except SlackApiError as e:
        print(f"❌ Slack API error: {e.response['error']}")
        print("Possible causes:")
        print("- Invalid/expired token")
        print("- Token doesn't have required scopes")
        print("- Bot hasn't been added to channel")
        return False

if __name__ == "__main__":
    test_slack_auth()

✅ Successfully authenticated to Slack as sporty_bot (Team: employee-activity-etl-poc)


In [None]:


class RedpandaToSlack:
    def __init__(self):
        # Kafka/Redpanda configuration
        self.kafka_config = {
            'bootstrap.servers': os.getenv('REDPANDA_BROKERS', 'localhost:9092'),
            'group.id': 'employee-activities-slack-notifier',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False
        }
        
        # Slack configuration
        self.slack_token = os.getenv('SLACK_TOKEN')
        self.slack_channel = os.getenv('SLACK_CHANNEL', '#employee-activities')
        self.slack_client = WebClient(token=self.slack_token)
        
        # Kafka consumer
        self.consumer = Consumer(self.kafka_config)
        self.topic = 'pg_cdc.public.employee_activities'
    
    def convert_microtimestamp(self, ts):
        """Convert Debezium microsecond timestamp to datetime"""
        seconds = ts / 1_000_000
        return datetime.fromtimestamp(seconds).strftime('%Y-%m-%d %H:%M:%S')
    
    def format_slack_message(self, payload):
        """Format the Kafka message into a Slack message"""
        op_map = {
            'c': 'Created',
            'r': 'Read (Snapshot)',
            'u': 'Updated',
            'd': 'Deleted'
        }
        
        operation = op_map.get(payload['op'], payload['op'])
        record = payload['after'] if payload['after'] else payload['before']
        
        if not record:
            return None
        
        # Convert timestamps
        start_time = self.convert_microtimestamp(record['Date_de_debut'])
        end_time = self.convert_microtimestamp(record['Date_de_fin'])
        
        # Calculate duration in hours
        duration_hours = (record['Date_de_fin'] - record['Date_de_debut']) / (1_000_000 * 60 * 60)
        
        message = {
            "blocks": [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": f"New Employee Activity ({operation})"
                    }
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*Employee ID:*\n{record['ID_salarie']}"
                        },
                        {
                            "type": "mrkdwn",
                            "text": f"*Activity ID:*\n{record['ID']}"
                        }
                    ]
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*Sport Type:*\n{record['Sport_type']}"
                        },
                        {
                            "type": "mrkdwn",
                            "text": f"*Distance:*\n{record['Distance_m']} meters"
                        }
                    ]
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*Start Time:*\n{start_time}"
                        },
                        {
                            "type": "mrkdwn",
                            "text": f"*End Time:*\n{end_time}"
                        }
                    ]
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*Duration:*\n{duration_hours:.2f} hours"
                        }
                    ]
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Comment:*\n{record.get('Commentaire', 'No comment')}"
                    }
                },
                {
                    "type": "context",
                    "elements": [
                        {
                            "type": "mrkdwn",
                            "text": f"Processed at: {datetime.fromtimestamp(payload['ts_ms']/1000).strftime('%Y-%m-%d %H:%M:%S')}"
                        }
                    ]
                }
            ]
        }
        
        return message
    
    def send_to_slack(self, message):
        """Send formatted message to Slack"""
        try:
            response = self.slack_client.chat_postMessage(
                channel=self.slack_channel,
                blocks=message["blocks"]
            )
            print(f"Message sent to Slack: {response['ts']}")
        except SlackApiError as e:
            print(f"Error sending message to Slack: {e.response['error']}")
    
    def process_message(self, msg):
        """Process a single Kafka message"""
        try:
            payload = json.loads(msg.value().decode('utf-8'))['payload']
            print(f"Processing message with operation: {payload['op']}")
            
            # Skip tombstones and other non-data messages
            if payload['op'] == 'd' and not payload['before']:
                print("Skipping tombstone message")
                return
            
            slack_message = self.format_slack_message(payload)
            if slack_message:
                self.send_to_slack(slack_message)
            
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
        except KeyError as e:
            print(f"Missing expected field in message: {e}")
    
    def run(self):
        """Main consumer loop"""
        try:
            self.consumer.subscribe([self.topic])
            
            while True:
                msg = self.consumer.poll(1.0)
                
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaException._PARTITION_EOF:
                        # End of partition event
                        continue
                    else:
                        print(f"Consumer error: {msg.error()}")
                        break
                
                self.process_message(msg)
                self.consumer.commit(msg)
                
        except KeyboardInterrupt:
            print("Consumer interrupted")
        finally:
            self.consumer.close()

if __name__ == "__main__":
    notifier = RedpandaToSlack()
    notifier.run()