Skip to content
This repository has been archived by the owner on May 5, 2024. It is now read-only.

Commit

Permalink
new bebug and many updates!
Browse files Browse the repository at this point in the history
  • Loading branch information
pixeldeee committed Aug 4, 2023
1 parent caccaf6 commit 8f6c386
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 59 deletions.
130 changes: 83 additions & 47 deletions pytecord/guild.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
from typing import Any
from typing import Any, Literal

from .interfaces import Object
from .user import User
from .role import Role
from .utils import MessagePayload, apost, rget
from .annotations import hash_str
from .reaction import Emoji, Sticker
from .utils import MessagePayload, get_snowflake, apost, rget
from .annotations import hash_str, permissions_set


class WelcomeScreenChannel:
def __init__(self, data: dict[str, Any], token: str) -> None:
self.description: str = data.get('description')
self.emoji_id: int | None = get_snowflake('emoji_id')
self.emoji_name: str | None = data.get('emoji_name')

self.__channel_id = int(data.get('channel_id'))
self.__token = token

@property
def channel(self) -> 'GuildChannel':
data = rget(f'/channels/{self.__channel_id}', self.__token).json()
return GuildChannel(data, self.__token)

class WelcomeScreen:
def __init__(self, data: dict[str, Any], token: str) -> None:
self.description: str | None = data.get('description')
self.channels: list[WelcomeScreenChannel] = [WelcomeScreenChannel(i, token) for i in data.get('welcome_channels')]

class Guild(Object):
def __init__(self, data: dict[str, Any], token: str):
self.id: int = int(data.get('id'))
self.id: int = get_snowflake(data.get('id'))
self.name: str = data.get('name')
self.icon: hash_str | None = data.get('icon')
self.icon_hash: hash_str | None = data.get('icon_hash')
Expand All @@ -17,39 +38,39 @@ def __init__(self, data: dict[str, Any], token: str):
self.is_owner: bool | None = data.get('owner')
self.permissions: str | None = data.get('permissions')
self.region: str | None = data.get('region') # deprecated
self.afk_channel_id: int | None = int(x) if (x := data.get('afk_channel_id')) else None
self.afk_channel_id: int | None = get_snowflake(data.get('afk_channel_id'))
self.afk_timeout: int = data.get('afk_timeout')
self.widget_enabled: bool | None = data.get('widget_enabled')
self.widget_channel_id: int | None = int(x) if (x := data.get('widget_channel_id')) else None
self.widget_channel_id: int | None = get_snowflake(x := data.get('widget_channel_id'))
self.verification_level: int = data.get('verification_level')
self.default_message_notifications: int = data.get('default_message_notifications')
self.explicit_content_filter: int = data.get('explicit_content_filter')
self.roles: list[Role] = [Role(i) for i in data.get('roles')]
self.emojis = data.get('emojis')
self.features = data.get('features')
self.mfa_level = data.get('mfa_level')
self.application_id = int(x) if (x := data.get('application_id')) else None
self.system_channel_id = int(x) if (x := data.get('application_id')) else None
self.system_channel_flags = data.get('system_channel_flags')
self.rules_channel_id = int(x) if (x := data.get('rules_channel_id')) else None
self.max_presences = data.get('max_presences')
self.max_members = data.get('max_members')
self.vanity_url_code = data.get('vanity_url_code')
self.description = data.get('description')
self.banner = data.get('banner')
self.premium_tier = data.get('premium_tier')
self.premium_subscription_count = data.get('premium_subscription_count')
self.preferred_locale = data.get('preferred_locale')
self.public_updates_channel_id = int(data.get('public_updates_channel_id'))
self.max_video_channel_users = data.get('max_video_channel_users')
self.max_stage_video_channel_users = data.get('max_stage_video_channel_users')
self.approximate_member_count = data.get('approximate_member_count')
self.approximate_presence_count = data.get('approximate_presence_count')
self.welcome_screen = data.get('welcome_screen')
self.nsfw_level = data.get('nsfw_level')
self.stickers = data.get('stickers')
self.premium_progress_bar_enabled = data.get('premium_progress_bar_enabled')
self.safety_alerts_channel_id = int(x) if (x := data.get('safety_alerts_channel_id')) else None
self.roles: list[Role] = [Role(i) for i in data.get('roles', [])]
self.emojis: list[Emoji] = [Emoji(i) for i in data.get('emojis', [])]
self.features: list[str] = data.get('features')
self.mfa_level: int = data.get('mfa_level')
self.application_id: int | None = get_snowflake('application_id')
self.system_channel_id: int | None = get_snowflake('system_channel_id')
self.system_channel_flags: int = data.get('system_channel_flags')
self.rules_channel_id: int | None = get_snowflake('rules_channel_id')
self.max_presences: int | None = data.get('max_presences')
self.max_members: int | None = data.get('max_members')
self.vanity_url_code: str | None = data.get('vanity_url_code')
self.description: str | None = data.get('description')
self.banner: hash_str | None = data.get('banner')
self.premium_tier: int = data.get('premium_tier')
self.premium_subscription_count: int | None = data.get('premium_subscription_count')
self.preferred_locale: str = data.get('preferred_locale')
self.public_updates_channel_id: int | None = int(data.get('public_updates_channel_id'))
self.max_video_channel_users: int | None = data.get('max_video_channel_users')
self.max_stage_video_channel_users: int | None = data.get('max_stage_video_channel_users')
self.approximate_member_count: int | None = data.get('approximate_member_count')
self.approximate_presence_count: int | None = data.get('approximate_presence_count')
self.welcome_screen: WelcomeScreen | None = WelcomeScreen(x, token) if (x := data.get('welcome_screen')) else None
self.nsfw_level: int = data.get('nsfw_level')
self.stickers: list[Sticker] = [Sticker(i, token) for i in x] if (x := data.get('stickers')) else None
self.premium_progress_bar_enabled: bool = data.get('premium_progress_bar_enabled')
self.safety_alerts_channel_id: int | None = get_snowflake('safety_alerts_channel_id')

self.__owner_id = data.get('owner_id')
self.__token = token
Expand Down Expand Up @@ -93,25 +114,40 @@ def eval(self) -> dict[str, Any]:
"""
return self.__data

class Overwrite:
def __init__(self, data: dict[str, Any]) -> None:
self.id = get_snowflake('id')
self.type: Literal[0, 1] = data.get('type')
self.str_type: Literal['role', 'member'] = 'role' if self.type == 0 else 'member'
self.allow: permissions_set = data.get('allow')
self.deny: permissions_set = data.get('deny')

def __int__(self) -> int:
"""
Returns a role or member id (see .type and .str_type)
"""
return self.id


class GuildChannel(Object):
def __init__(self, data: dict[str, Any], token: str):
self.id = int(data.get('id'))
self.type = data.get('type')
self.position = data.get('position')
self.permission_overwrites = data.get('permission_overwrites')
self.id: int = get_snowflake('id')
self.type: int = data.get('type')
self.position: int | None = data.get('position')
self.permission_overwrites: list[Overwrite] = [Overwrite(i) for i in data.get('permission_overwrites', [])]
self.name = data.get('name')
self.topic = data.get('topic')
self.nsfw = data.get('nsfw')
self.last_message_id = int(x) if (x := data.get('last_message_id')) else None
self.last_message_id = get_snowflake('last_message_id')
self.bitrate = data.get('bitrate')
self.user_limit = data.get('user_limit')
self.rate_limit_per_user = data.get('rate_limit_per_user')
self.recipients = [User(i, token) for i in x] if (x := data.get('recipients')) else None
self.icon = data.get('icon')
self.owner_id = int(x) if (x := data.get('owner_id')) else None
self.application_id = int(x) if (x := data.get('application_id')) else None
self.owner_id = get_snowflake('owner_id')
self.application_id = get_snowflake('application_id')
self.managed = data.get('managed')
self.parent_id = int(x) if (x := data.get('parent_id')) else None
self.parent_id = get_snowflake('parent_id')
self.last_pin_timestamp = data.get('last_pin_timestamp')
self.rtc_region = data.get('rtc_region')
self.video_quality_mode = data.get('video_quality_mode')
Expand All @@ -130,7 +166,7 @@ def __init__(self, data: dict[str, Any], token: str):
self.default_sort_order = data.get('default_sort_order')
self.default_forum_layout = data.get('default_forum_layout')

self.__guild_id = int(x) if (x := data.get('guild_id')) else None
self.__guild_id = get_snowflake('guild_id')
self.__token = token
self.__data = data

Expand Down Expand Up @@ -204,7 +240,7 @@ async def send(self, content: str) -> 'Message':

class Message:
def __init__(self, data: dict[str, Any], token: str) -> None:
self.id = int(data.get('id'))
self.id = get_snowflake('id')
self.author = User(data.get('author'), token)
self.content = data.get('content')
self.timestamp = data.get('timestamp')
Expand All @@ -219,11 +255,11 @@ def __init__(self, data: dict[str, Any], token: str) -> None:
self.reactions = data.get('reactions')
self.nonce = data.get('nonce')
self.pinned = data.get('pinned')
self.webhook_id = int(x) if (x := data.get('webhook_id')) else None
self.webhook_id = get_snowflake('webhook_id')
self.type = data.get('type')
self.activity = data.get('activity')
self.application = data.get('application')
self.application_id = int(x) if (x := data.get('application_id')) else None
self.application_id = get_snowflake('application_id')
self.message_reference = data.get('message_reference')
self.flags = data.get('flags')
self.referenced_message = data.get('referenced_message')
Expand All @@ -236,7 +272,7 @@ def __init__(self, data: dict[str, Any], token: str) -> None:
self.role_subscription_data = data.get('role_subscription_data')

# Extra fields for message create
self.guild_id = int(x) if (x := data.get('guild_id')) else None
self.guild_id = get_snowflake('guild_id')
self.member = data.get('member')

self.__channel_id = data.get('channel_id')
Expand Down Expand Up @@ -289,10 +325,10 @@ async def reply(self, content: str) -> 'Message':

class MessageDeleteEvent:
def __init__(self, data: dict[str, Any], token: str) -> None:
self.id = int(data.get('id'))
self.id = get_snowflake('id')

self.__channel_id = int(data.get('channel_id'))
self.__guild_id = int(x) if (x := data.get('guild_id')) else None
self.__guild_id = get_snowflake('guild_id')
self.__token = token

@property
Expand Down
31 changes: 31 additions & 0 deletions pytecord/reaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Any
from .role import Role
from .user import User
from .utils import get_snowflake

class Emoji:
def __init__(self, data: dict[str, Any], token: str) -> None:
self.id: int = get_snowflake('id')
self.name: str | None = data.get('name')
self.roles: list[Role] | None = [Role(i) for i in x] if (x := data.get('roles')) else None
self.user: User = User(data.get('user'), token)
self.require_colons: bool | None = data.get('require_colons')
self.managed: bool | None = data.get('managed')
self.animated: bool | None = data.get('animated')
self.available: bool | None = data.get('available')


class Sticker:
def __init__(self, data: dict[str, Any], token: str) -> None:
self.id: int = get_snowflake('id')
self.pack_id = get_snowflake('pack_id')
self.name: str = data.get('name')
self.description: str | None = data.get('description')
self.tags: str = data.get('tags')
self.asset: str = data.get('asset') # deprecated
self.type: int = data.get('type')
self.format_type: int = data.get('format_type')
self.available: bool | None = data.get('available')
self.guild_id: int | None = get_snowflake('guild_id')
self.user: User | None = User(x, token) if (x := data.get('user')) else None
self.sort_value: int | None = data.get('sort_value')
7 changes: 4 additions & 3 deletions pytecord/role.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Any

from .annotations import hash_str, permissions_set
from .utils import get_snowflake

class RoleTags:
def __init__(self, data: dict[str, Any]) -> None:
self.bot_id = int(x) if (x := data.get('bot_id')) else None
self.integration_id = int(x) if (x := data.get('integration_id')) else None
self.bot_id = get_snowflake('bot_id')
self.integration_id = get_snowflake('integration_id')
self.premium_subscriber = data.get('premium_subscriber')
self.subscription_listing_id = int(x) if (x := data.get('subscription_listing_id')) else None
self.subscription_listing_id = get_snowflake('subscription_listing_id')
self.available_for_purchase = data.get('available_for_purchase')
self.guild_connections = data.get('guild_connections')

Expand Down
3 changes: 2 additions & 1 deletion pytecord/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from .interfaces import Object
from .annotations import hash_str
from .utils import get_snowflake

class User(Object):
def __init__(self, data: dict[str, Any], token: str) -> None:
self.id: int = int(data.get('id'))
self.id: int = get_snowflake('id')
self.username: str = data.get('username')
self.discriminator: str | None = data.get('discriminator')
self.global_name: str | None = data.get('global_name')
Expand Down
20 changes: 19 additions & 1 deletion pytecord/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
from typing import Any, Literal
from typing import Any

from aiohttp import ClientSession
from requests import get

from .config import API_VERSION

# API/GATEWAY

class MessagePayload:
def __init__(self, content: str) -> None:
Expand Down Expand Up @@ -38,3 +39,20 @@ async def apost(endpoint: str, token: str = None, headers: dict[str, Any] = None
if r.status == 200:
return await r.json()
raise Exception(await r.json())

# OTHER

def get_snowflake(__snowflake: str, __default: Any = None) -> int | Any:
"""
Get snowflake from string. `__snowflake` argument is string,
`__default` is a default which is using in situations when
`bool(__snowflake)` is `False`
"""
return int(x) if (x := __snowflake) else __default

def check_module(module: str):
try:
__import__(module)
except ImportError:
return False
return True
44 changes: 37 additions & 7 deletions pytecord/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from asyncio import sleep as asleep
from datetime import datetime
from time import mktime
from typing import TYPE_CHECKING, Any, Callable, Coroutine
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Literal

from aiohttp import ClientSession

from .config import GATEWAY_VERSION
from .interfaces import BaseDataStreamListener
from .utils import get_headers, rget
from .utils import get_headers, rget, check_module

if TYPE_CHECKING:
from .guild import Guild, GuildChannel
Expand Down Expand Up @@ -81,17 +81,49 @@ def __init__(
self.afk = afk
self.status = status
self.activities = activities

def __debug(self, data: GatewayRequest | GatewayOutput | None, type: Literal['send', 'receive']):
if self.debug and data:
have_colorama = check_module('colorama')
if have_colorama:
import colorama
colorama.init()

x = f'{colorama.Fore.YELLOW}DEBUG {type.upper()} '
if data.op is not None:
x += f'{colorama.Fore.GREEN}op:{data.op} '
if data.s is not None:
x += f'{colorama.Fore.RED}s:{data.s} '
if data.d is not None:
x += f'{colorama.Fore.BLUE}d:{data.d} '
if data.t is not None:
x += f'{colorama.Fore.MAGENTA}t:{data.t}'
else:
x = f'DEBUG {type.upper()} '
if data.op:
x += f'op:{data.op} '
if data.s:
x += f's:{data.s} '
if data.d:
x += f'd:{data.d} '
if data.t:
x += f't:{data.t}'
print(x, end='\n' * 2)

async def receive_response(self) -> GatewayOutput | None:
try:
j = await self._ws.receive_json()
return GatewayOutput(data=j) if j else None
data = GatewayOutput(data=j) if j else None
self.__debug(data, 'receive')
return data
except TypeError:
return None

async def send_request(self, data: GatewayRequest) -> dict:
async def send_request(self, data: GatewayRequest) -> GatewayRequest:
await self._ws.send_json(data.eval())
return GatewayRequest(data)
request = GatewayRequest(data=data)
self.__debug(request, 'send')
return request

async def identify(self):
await self.send_request(GatewayRequest(
Expand Down Expand Up @@ -125,8 +157,6 @@ async def check_events(self):
...
else:
continue
if self.debug:
print(f'DEBUG op:{data.op} s:{data.s} t:{data.t} d:{data.d}', end='\n' * 2)
await self.listener.listen(data)

async def run(self, intents: int = None):
Expand Down

0 comments on commit 8f6c386

Please sign in to comment.