In [None]:
import json
import pymongo
import stomp
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# MongoDB connection
# mongodb://localhost:27017 
# mongodb+srv://nghials2s:nghia123@cluster0.ewqia.mongodb.net/reddit_db?retryWrites=true&w=majority
mongo_client = pymongo.MongoClient("mongodb+srv://nghials2s:nghia123@cluster0.ewqia.mongodb.net/reddit_db?retryWrites=true&w=majority")
db = mongo_client["reddit_db"]
threads_collection = db["threads"]
comments_collection = db["comments"]

# ActiveMQ server
ACTIVE_MQ_HOST = 'localhost'
ACTIVE_MQ_PORT = 61613
QUEUE_NAME_THREADS = '/queue/reddit_threads'
QUEUE_NAME_COMMENTS = '/queue/reddit_comments'

# Listener for ActiveMQ messages
class RedditListener(stomp.ConnectionListener):
    def on_error(self, frame):
        logger.error(f'Error received: {frame.body}')

    def on_disconnected(self):
        logger.warning('Disconnected from ActiveMQ')
        # Attempt to reconnect
        connect_to_activemq()

    def on_message(self, frame):
        try:
            logger.info(f'Received message from destination: {frame.headers.get("destination")}')
            message = json.loads(frame.body)

            logger.debug(f'Raw message: {message}')

            if frame.headers.get('destination') == QUEUE_NAME_THREADS:
                # Handle thread message
                result = threads_collection.update_one(
                    {"thread_id": message["thread_id"]},
                    {"$set": message},
                    upsert=True
                )
                logger.info(f'{"Inserted" if result.upserted_id else "Updated"} thread: {message["thread_id"]}')

            elif frame.headers.get('destination') == QUEUE_NAME_COMMENTS:
                # Handle comment message
                result = comments_collection.update_one(
                    {"comment_id": message["comment_id"]},
                    {"$set": message},
                    upsert=True
                )
                logger.info(f'{"Inserted" if result.upserted_id else "Updated"} comment: {message["comment_id"]}')

        except json.JSONDecodeError as e:
            logger.error(f'Failed to parse message: {e}')
        except KeyError as e:
            logger.error(f'Missing required field: {e}')
        except Exception as e:
            logger.error(f'Error processing message: {e}')

def connect_to_activemq():
    try:
        conn = stomp.Connection([(ACTIVE_MQ_HOST, ACTIVE_MQ_PORT)])

        # Set up listener
        listener = RedditListener()
        conn.set_listener('', listener)

        conn.connect(wait=True)

        # Subscribe to both queues
        conn.subscribe(destination=QUEUE_NAME_THREADS, id=1, ack='auto')
        conn.subscribe(destination=QUEUE_NAME_COMMENTS, id=2, ack='auto')

        logger.info('Successfully connected to ActiveMQ')
        return conn
    except Exception as e:
        logger.error(f'Failed to connect to ActiveMQ: {e}')
        return None

if __name__ == '__main__':
    # Initial connection
    conn = connect_to_activemq()

    if conn:
        logger.info("Listening for messages from ActiveMQ...")

        try:
            while True:
                if not conn.is_connected():
                    logger.warning("Connection lost, attempting to reconnect...")
                    conn = connect_to_activemq()
                    if not conn:
                        import time
                        time.sleep(5)  # Wait before retry
        except KeyboardInterrupt:
            logger.info("Shutting down...")
            if conn.is_connected():
                conn.disconnect()

INFO:__main__:Updated comment: meuxg39
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meuxbhm
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meux6z8
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meux6vs
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meux2p0
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meux00m
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meuwzrc
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meuwt44
INFO:__main__:Received message from destination: /queue/reddit_comments
INFO:__main__:Updated comment: meuwt0h
INFO:__main__:Received message from destination: /queue/reddit_comments
I