#Hexagonal Architecture (Ports and Adapters)

In [None]:
%%writefile core.py
# core.py

class User:
    def __init__(self, user_id, name, email):
        self.user_id = user_id
        self.name = name
        self.email = email


#Ports
class UserRepository:
    def __init__(self):
        self.users = {}

    def add_user(self, user):
        self.users[user.user_id] = user

    def get_user_by_id(self, user_id):
        return self.users.get(user_id)

class EmailService:
    def send_email(self, to_email, subject, body):
        raise NotImplementedError

Writing core.py


#SQLite Adapter


In [None]:
%%writefile sqlite_adapter.py
# sqlite_adapter.py
import sqlite3
from core import UserRepository, User

class SQLiteUserRepository(UserRepository):
    def __init__(self):
        self.connection = sqlite3.connect(":memory__")
        self._create_table()

    def _create_table(self):
        cursor = self.connection.cursor()
        cursor.execute("""
            CREATE TABLE users (
                user_id TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT NOT NULL
            )
        """)
        self.connection.commit()

    def add_user(self, user):
        cursor = self.connection.cursor()
        cursor.execute("INSERT INTO users VALUES (?, ?, ?)", (user.user_id, user.name, user.email))
        self.connection.commit()

    def get_user_by_id(self, user_id):
        cursor = self.connection.cursor()
        cursor.execute("SELECT * FROM users WHERE user_id = ?", (user_id,))
        row = cursor.fetchone()
        if row:
            return User(row[0], row[1], row[2])
        return None

Writing sqlite_adapter.py


#Email Service Adapter


In [None]:
%%writefile email_service_adapter.py
# email_service_adapter.py
import sqlite3
from core import EmailService

class ConsoleEmailService(EmailService):
    def send_email(self, to_email, subject, body):
        print(f"Sending email to: {to_email}")
        print(f"Subject: {subject}")
        print(f"Body: {body}")
        print("Email sent successfully.")

Writing email_service_adapter.py


#Application Services


In [None]:
%%writefile user_service.py

# user_service.py
from core import User

class UserService:
    def __init__(self, user_repository, email_service):
        self.user_repository = user_repository
        self.email_service = email_service

    def register_user(self, user_id, name, email):
        user = User(user_id, name, email)
        self.user_repository.add_user(user)
        self.email_service.send_email(email, "Welcome!", f"Hello {name}, welcome to our platform!")

Writing user_service.py


In [None]:
# app.py
from core import User
from sqlite_adapter import SQLiteUserRepository
from email_service_adapter import ConsoleEmailService
from user_service import UserService

def main():
    # Create repositories and services
    user_repo = SQLiteUserRepository()
    email_service = ConsoleEmailService()
    user_service = UserService(user_repo, email_service)

    # Register new user
    user_id = input("Enter user ID: ")
    name = input("Enter name: ")
    email = input("Enter email: ")

    user_service.register_user(user_id, name, email)
    print("User registered successfully!")

if __name__ == "__main__":
    main()

Enter user ID: 1
Enter name: Aaron
Enter email: aaron@gmail.com
Sending email to: aaron@gmail.com
Subject: Welcome!
Body: Hello Aaron, welcome to our platform!
Email sent successfully.
User registered successfully!
