In [1]:
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Bot
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler, MessageHandler, filters, CallbackContext
from telegram.ext import Application, ContextTypes
import sqlite3
import nest_asyncio
nest_asyncio.apply()

TOKEN = "6929830229:AAEXbYO97fey0HwecRuIPFTLXYT-WxzgigI"
MANAGER = {"id":"ali_zeighamiyan"}
ROLES = ["employee", "stockman"]
WORKERS = []


In [2]:
class DataBaseFetch:
    def __init__(self, db_name) -> None:
        self.conn = sqlite3.connect(db_name)
        self.conn.execute('PRAGMA foreign_keys = ON')  # Enable foreign key support

        self.cursor = self.conn.cursor()
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS workers (
                worker_username TEXT,
                worker_name TEXT PRIMARY KEY
            )
        ''')

        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS roles (
            role_name TEXT PRIMARY KEY
        )
        ''')

        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS worker_roles (
            worker_name TEXT,
            role_name TEXT,
            FOREIGN KEY (worker_name) REFERENCES workers(worker_name) ON DELETE CASCADE,
            FOREIGN KEY (role_name) REFERENCES roles(role_name) ON DELETE CASCADE,
            PRIMARY KEY (worker_name, role_name)
        )
        ''')
        self.conn.commit()    

    def add_worker(self, worker_name, worker_username):
        self.cursor.execute('''
            INSERT INTO workers (worker_name, worker_username) VALUES (?, ?)
            ON CONFLICT(worker_name) DO UPDATE SET worker_name=excluded.worker_name, worker_username=excluded.worker_username
        ''', (worker_name, worker_username))
        self.conn.commit()

    def get_workers(self):
        self.cursor.execute('SELECT worker_name, worker_username FROM workers')
        workers = self.cursor.fetchall()
        return workers

    def add_role(self, role_name):
        self.cursor.execute('INSERT OR IGNORE INTO roles (role_name) VALUES (?)', (role_name,))
        self.conn.commit()
        
    def assign_role_to_worker(self, worker_name, role_name):
        self.cursor.execute('''
                            INSERT OR IGNORE INTO worker_roles (worker_name, role_name) VALUES (?, ?)
                            ''', (worker_name, role_name))
        self.conn.commit()

    # Function to get all workers with their roles
    def get_workers_with_roles(self, role=None):
        query = '''
            SELECT wr.worker_name, wr.role_name
            FROM worker_roles wr
        '''
        if role:
            query += "\n" + "WHERE wr.role_name = ?"
            self.cursor.execute(query, (role, ))
        else:
            self.cursor.execute(query)
            
        return self.cursor.fetchall()

    # Function to get all roles with their workers
    def get_roles_with_workers(self, worker=None):
        query = '''
            SELECT wr.role_name, wr.worker_name
            FROM worker_roles wr
        '''
        if worker:
            query += "\n" + "WHERE wr.worker_name = ?"
            self.cursor.execute(query, (worker, ))
        else:
            self.cursor.execute(query)
        return self.cursor.fetchall()
    
    def delete_role_from_worker(self, worker_name, role_name):
        self.cursor.execute('SELECT worker_name, role_name FROM worker_roles WHERE worker_name = ? AND role_name = ?', (worker_name, role_name))
        if self.cursor.fetchone():
            self.cursor.execute('DELETE FROM worker_roles WHERE worker_name = ? AND role_name = ?', (worker_name, role_name))
            self.conn.commit()
            print(f"Role '{role_name}' removed from worker '{worker_name}'.")
        
        else:
            print(f"Role '{role_name}' from worker '{worker_name}' doesn't exist!.")
            
    def delete_worker(self, worker_name):
        self.cursor.execute('DELETE FROM workers WHERE worker_name = ?', (worker_name, ))
        self.conn.commit()
        print(f"worker: {worker_name} Removed!")



In [7]:
db_fetcher = DataBaseFetch(db_name="user_data.db")
[db_fetcher.add_role(role) for role in ROLES]
db_fetcher.add_worker("ali", "z")
db_fetcher.assign_role_to_worker("ali", ROLES[0])

In [4]:
db_fetcher.get_workers()

[('ali', 'z')]

In [5]:
db_fetcher.add_worker("ali", "ali3515")

In [6]:
db_fetcher.get_workers_with_roles()

[('ali', 'stockman'), ('ali', 'employee')]

In [8]:
db_fetcher = DataBaseFetch(db_name="user_data.db")
# In-memory storage for users, workers, and roles
users = {"roles":ROLES, "workers":[], "workers_roles":{}}

managers = {"ali_zeighamiyan": True}  # Replace with actual manager usernames

def get_worker_buttons_markup(workers:list, msg:str):
    if not workers :
        workers.append(("No Worker Found!", ))
    worker_buttons = [InlineKeyboardButton(worker[0], callback_data=f"worker-{msg}:{worker[0]}") for worker in workers]
    worker_markup = InlineKeyboardMarkup.from_column(worker_buttons)
    return worker_markup

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /start is issued."""
    username = update.message.from_user.username

    if username in managers:
        keyboard = [
            [InlineKeyboardButton("Add Worker", callback_data='addworker')],
            [InlineKeyboardButton("Assign Role", callback_data='assignrole')],
            [InlineKeyboardButton("View Workers", callback_data='viewworkers')],
            [InlineKeyboardButton("Delete Worker", callback_data='deleteworker')]
        ]
    else:
        keyboard = [
            [InlineKeyboardButton("View Workers", callback_data='viewworkers')]
        ]

    reply_markup = InlineKeyboardMarkup(keyboard)
    await update.message.reply_text('Welcome to the Worker Manager Bot! Choose an option:', reply_markup=reply_markup)

async def button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle button presses for various commands."""
    query = update.callback_query
    await query.answer()

    data = query.data
    username = query.from_user.username

    if data == 'addworker' and username in managers:
        await add_worker(query, context)
    elif data == 'addrole' and username in managers:
        context.user_data['adding_role'] = True
        await query.edit_message_text('Please enter the name of the role to add.')
    elif data == 'assignrole' and username in managers:
        await assign_role(query, context)
    elif data == 'viewworkers':
        await view_workers(query)
    elif data == "deleteworker":
        await delete_worker(query)
        
    elif data.startswith("worker-assign"):
        worker_name = data.split(":")[1]
        await handle_worker_role_assignment(query, context, worker_name)
    elif data.startswith("role-assign"):
        role_name = data.split(":")[1]
        await handle_role_assignment(query, context, role_name)
        
    elif data.startswith("worker-delete"):
        role_name = data.split(":")[1]
        await delete_selected_worker(query, role_name)
    

async def view_workers(query):
    """Display the list of workers."""
    workers = db_fetcher.get_workers()
    worker_markup = get_worker_buttons_markup(workers, "view")
    await query.edit_message_text('Workers', reply_markup=worker_markup)

    # await query.edit_message_text(f"Workers:\n{worker_list}")

async def assign_role(query, context):
    """Prompt the user to assign a role to a worker."""
    workers = db_fetcher.get_workers()
    if workers:
        pass
    else:
        await query.edit_message_text('You need to add workers first using /addworker.')
        return
    
    worker_markup = get_worker_buttons_markup(workers, "assign")
    await query.edit_message_text('Select a worker to assign a role:', reply_markup=worker_markup)

async def handle_worker_role_assignment(query, context, worker_name):
    role_buttons = [InlineKeyboardButton(role, callback_data=f"role-assign:{role}") for role in ROLES]
    role_markup = InlineKeyboardMarkup.from_column(role_buttons)
    workers = db_fetcher.get_workers()
    workers_names = [worker_name_username[0] for worker_name_username in workers]
    if worker_name in workers_names:
        context.user_data['selected_worker'] = worker_name
        await query.edit_message_text(f'Select a role to assign to {worker_name}:', reply_markup=role_markup)
    else :
        await query.edit_message_text(f"The Worker: {worker_name} doesn't exist! first add it")

async def handle_role_assignment(query, context, role_name):
    worker_name = context.user_data.get('selected_worker')
    if worker_name:
        users["workers_roles"][worker_name] = role_name
        db_fetcher.assign_role_to_worker(worker_name=worker_name, role_name=role_name)
        await query.edit_message_text(f'Assigned role {role_name} to worker {worker_name}.')
        

async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle incoming messages."""
    # user_id = update.message.from_user.id

    if context.user_data.get('adding_worker'):
        state = context.user_data.get("state")
        
        if state == "AddWorkerName":
            context.user_data["worker_detail"] = {}
            worker_name = update.message.text
            context.user_data["worker_detail"]["name"] = worker_name
            context.user_data["state"] = "AddWorkerID"
            await update.message.reply_text(f'Got it! Now enter the related username for : {worker_name}.')

        elif state == "AddWorkerID":
            worker_name = context.user_data["worker_detail"]["name"]
            worker_username = update.message.text
            context.user_data["worker_detail"]["username"] = worker_username
            context.user_data['adding_worker'] = False
            context.user_data["state"] = None
            db_fetcher.add_worker(worker_name, worker_username)
            await update.message.reply_text(f'Worker {worker_name} with username {worker_username} added.')
            
            
    elif context.user_data.get('adding_role'):
        role_name = update.message.text
        users["roles"][role_name] = []
        context.user_data['adding_role'] = False
        await update.message.reply_text(f'Role {role_name} added.')

async def add_worker(query: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Prompt the user to add a worker."""
    context.user_data['adding_worker'] = True
    context.user_data["state"] = "AddWorkerName"
    await query.edit_message_text('Please enter the name of the worker to add.')

async def delete_worker(query):
    workers = db_fetcher.get_workers()
    worker_markup = get_worker_buttons_markup(workers, msg="delete")
    await query.edit_message_text('Select a worker to delete:', reply_markup=worker_markup)

async def delete_selected_worker(query, worker_name):
    db_fetcher.delete_worker(worker_name=worker_name)
    await query.edit_message_text(f'Worker {worker_name} deleted')

In [9]:
async def main() -> None:
    """Start the bot."""
    # Replace 'YOUR_TOKEN_HERE' with your bot's token

    
    # Create a new application instance
    application = Application.builder().token(TOKEN).build()

    # Add handlers to the dispatcher
    application.add_handler(CommandHandler("start", start))
    application.add_handler(CallbackQueryHandler(button))
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

    # Run the bot
    await application.run_polling()

if __name__ == '__main__':
    await main()