<a href="https://colab.research.google.com/github/ArjunDandagi/cs145/blob/main/Distributed_Systems.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Section 1: Distributed Message Queues and Communication

In [None]:
!pip install ratelimit

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ratelimit
  Downloading ratelimit-2.2.1.tar.gz (5.3 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ratelimit
  Building wheel for ratelimit (setup.py) ... [?25l[?25hdone
  Created wheel for ratelimit: filename=ratelimit-2.2.1-py3-none-any.whl size=5893 sha256=06b157281092f235413847a56d589f3b69be827e77e236243d8f6391aadcb2eb
  Stored in directory: /root/.cache/pip/wheels/27/5f/ba/e972a56dcbf5de9f2b7d2b2a710113970bd173c4dcd3d2c902
Successfully built ratelimit
Installing collected packages: ratelimit
Successfully installed ratelimit-2.2.1


In [None]:
import sqlite3
import json

"""Below is a trivial implementation of a MessageQueue. The main operations are
enqueue and dequeue. In practice, we'd use popular, scalable packages
(Kafka, RabbitMQ0 or services (e.g., AWS's Simple Queue Service)
"""
class MessageQueue:
    def __init__(self, db_name='message_queue.db'):
        self.db_name = db_name
        self.create_queue_table()

    def create_queue_table(self):
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute('''CREATE TABLE IF NOT EXISTS message_queue \
                  (id INTEGER PRIMARY KEY, topic TEXT, message TEXT)''')
        conn.commit()
        conn.close()

    def enqueue(self, topic, message):
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute("INSERT INTO message_queue (topic, message) \
                  VALUES (?, ?)", (topic, json.dumps(message)))
        conn.commit()
        conn.close()

    def dequeue(self, topic):
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute("SELECT id, message FROM message_queue WHERE topic=? \
                  ORDER BY id ASC LIMIT 1", (topic,))
        row = c.fetchone()
        if row:
            message_id, message = row
            c.execute("DELETE FROM message_queue WHERE id=?", (message_id,))
            conn.commit()
            return json.loads(message)
        conn.close()
        return None

In [None]:
"""
Example Producer for Concert Ticket purchases
"""
import time
from datetime import datetime, timedelta

import random

# Helper function to generate random IP addresses
def random_ip():
    return ".".join(map(str, (random.randint(0, 255) for _ in range(4))))

# Initialize the MessageQueue
mq = MessageQueue()

# Generate ticket purchase events and add them to the message queue
for i in range(100):
    purchase = {
        "user_id": random.randint(1, 50),
        "ip_address": random_ip(),
        "event_id": random.randint(1, 10),
        "num_tickets": random.randint(1, 6),
        "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }
    mq.enqueue("ticket_purchase", purchase)
    #time.sleep(random.uniform(0.0, 0.1))  # Wait for a random period before generating the next event

In [None]:
import json, time
from datetime import datetime, timedelta
from ratelimit import limits, sleep_and_retry

"""
Example consumer for Concert tickets
"""
# Parameters for fraud detection
purchase_threshold = 5  # Threshold for the number of tickets purchased by a user
ip_threshold = 3  # Threshold for the number of purchases from the same IP address
time_window = timedelta(minutes=10)

# Dictionary to store the count of ticket purchases per user and IP address
user_purchase_count = {}
ip_purchase_count = {}

mq = MessageQueue()

@sleep_and_retry
@limits(calls=10, period=1)  # Rate limiting to mitigate DDoS attacks
def process_ticket_purchase(purchase):
    # Add the ticket purchase event to the distributed database

    # Perform fraud detection using a more sophisticated algorithm
    # that leverages machine learning techniques, such as clustering,
    # anomaly detection, or supervised learning models
    print(f'    Processing: {purchase=}')
    pass

while True:
    purchase = mq.dequeue("ticket_purchase")
    if purchase:
        # Update the user and IP address purchase count
        user_id = purchase["user_id"]
        ip_address = purchase["ip_address"]
        user_purchase_count[user_id] = user_purchase_count.get(user_id, 0) + 1
        ip_purchase_count[ip_address] = ip_purchase_count.get(ip_address, 0) + 1

        # Process the ticket purchase and perform fraud detection
        process_ticket_purchase(purchase)

        # Flag 'outlier' users
        # Eg., too many purchases or from same IP address
        if user_purchase_count[user_id] >= purchase_threshold or \
          ip_purchase_count[ip_address] >= ip_threshold:
            # Flag the user as suspicious
            print(f'====> Suspicious user:{user_id=}, {ip_purchase_count=}')
            pass
    else:
        break
        # time.sleep(1)  # Wait for a short period before checking the queue again


    Processing: purchase={'user_id': 12, 'ip_address': '70.85.66.59', 'event_id': 1, 'num_tickets': 2, 'timestamp': '2023-05-16 16:47:29'}
    Processing: purchase={'user_id': 35, 'ip_address': '245.50.190.23', 'event_id': 4, 'num_tickets': 6, 'timestamp': '2023-05-16 16:47:29'}
    Processing: purchase={'user_id': 22, 'ip_address': '193.90.33.152', 'event_id': 5, 'num_tickets': 2, 'timestamp': '2023-05-16 16:47:29'}
    Processing: purchase={'user_id': 9, 'ip_address': '160.56.135.8', 'event_id': 1, 'num_tickets': 4, 'timestamp': '2023-05-16 16:47:29'}
    Processing: purchase={'user_id': 33, 'ip_address': '246.54.177.253', 'event_id': 5, 'num_tickets': 1, 'timestamp': '2023-05-16 16:47:29'}
    Processing: purchase={'user_id': 49, 'ip_address': '75.220.31.173', 'event_id': 3, 'num_tickets': 2, 'timestamp': '2023-05-16 16:47:29'}
    Processing: purchase={'user_id': 33, 'ip_address': '108.8.176.215', 'event_id': 6, 'num_tickets': 5, 'timestamp': '2023-05-16 16:47:29'}
    Processing: 

# Section 2: Discord Messages in Cassandra/ScyllaDB

In [None]:
!pip install cassandra-driver

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting cassandra-driver
  Downloading cassandra_driver-3.27.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.0/19.0 MB[0m [31m43.6 MB/s[0m eta [36m0:00:00[0m
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.27.0 geomet-0.2.1.post1


In [None]:
import uuid
from datetime import datetime
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

"""Connect to ScyllaDB or Cassandra server
--> TODO: To run this, need to start a Cassandra or ScyllaDB server
          and replace below server and credentials"""
auth_provider = PlainTextAuthProvider(username='cassandra',password='cassandra')
cluster = Cluster(['127.0.0.1'], auth_provider=auth_provider)
session = cluster.connect()

# Create a keyspace and table if they do not exist
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS messaging_app
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
""")

session.execute("""
    CREATE TABLE IF NOT EXISTS messaging_app.messages (
        channel_id bigint,
        bucket int,
        message_id bigint,
        author_id bigint,
        content text,
        PRIMARY KEY ((channel_id, bucket), message_id)
    ) WITH CLUSTERING ORDER BY (message_id DESC);
""")

# Function to store a new message
def store_message(channel_id, author_id, content):
    bucket = 0  # For simplicity, we use a single bucket
    message_id = int(datetime.utcnow().timestamp() * 1000) # Use Unix timestamp

    query = """
        INSERT INTO messaging_app.messages
              (channel_id, bucket, message_id, author_id, content)
        VALUES (%s, %s, %s, %s, %s);
    """
    session.execute(query, (channel_id, bucket, message_id, author_id, content))
    print("Message stored successfully!")

# Function to retrieve the last 'n' messages for a given channel
def retrieve_messages(channel_id, n=5):
    bucket = 0  # For simplicity, we use a single bucket
    query = """
        SELECT * FROM messaging_app.messages
        WHERE channel_id=%s AND bucket=%s
        LIMIT %s;
    """
    rows = session.execute(query, (channel_id, bucket, n))
    messages=[f"{row.message_id}|{row.author_id}: {row.content}" for row in rows]

    if messages:
        print('\n'.join(messages))
    else:
        print(f"No messages found for channel {channel_id}")

# Example usage
store_message(1, 1001, 'Hello, world!')
retrieve_messages(1, n=5)



Traceback (most recent call last):
  File "cassandra/cluster.py", line 3538, in cassandra.cluster.ControlConnection._reconnect_internal
  File "cassandra/cluster.py", line 3560, in cassandra.cluster.ControlConnection._try_connect
  File "cassandra/cluster.py", line 1630, in cassandra.cluster.Cluster.connection_factory
  File "cassandra/connection.py", line 850, in cassandra.connection.Connection.factory
  File "/usr/local/lib/python3.10/dist-packages/cassandra/io/asyncorereactor.py", line 347, in __init__
    self._connect_socket()
  File "cassandra/connection.py", line 917, in cassandra.connection.Connection._connect_socket
ConnectionRefusedError: [Errno 111] Tried connecting to [('127.0.0.1', 9042)]. Last error: Connection refused
ERROR:cassandra.cluster:Control connection failed to connect, shutting down Cluster:
Traceback (most recent call last):
  File "cassandra/cluster.py", line 1700, in cassandra.cluster.Cluster.connect
  File "cassandra/cluster.py", line 3504, in cassandra.clu

NoHostAvailable: ignored