diff --git a/CatCore/CatCore.csproj b/CatCore/CatCore.csproj index aafcb66e..6c5d4f8d 100644 --- a/CatCore/CatCore.csproj +++ b/CatCore/CatCore.csproj @@ -39,7 +39,7 @@ - + diff --git a/CatCore/CatCoreInstance.cs b/CatCore/CatCoreInstance.cs index 75fff3e8..5344f448 100644 --- a/CatCore/CatCoreInstance.cs +++ b/CatCore/CatCoreInstance.cs @@ -136,16 +136,16 @@ private void CreateContainer() // Register Twitch-specific services container.Register(Reuse.Singleton); + container.RegisterInitializer((service, _) => service.Initialize()); container.Register(Reuse.Singleton, Made.Of(FactoryMethod.ConstructorWithResolvableArgumentsIncludingNonPublic)); - container.Register(Reuse.Singleton, Made.Of(FactoryMethod.ConstructorWithResolvableArgumentsIncludingNonPublic)); - container.Register(Reuse.Singleton); + container.RegisterMany(new[] { typeof(ITwitchHelixApiService), typeof(TwitchHelixApiService) }, typeof(TwitchHelixApiService), Reuse.Singleton, Made.Of(FactoryMethod.ConstructorWithResolvableArgumentsIncludingNonPublic)); + container.RegisterMany(new[] { typeof(ITwitchIrcService), typeof(ITwitchPubSubServiceManager) }, typeof(TwitchEventSubChatService), Reuse.Singleton); container.Register(Reuse.Singleton, Made.Of(FactoryMethod.ConstructorWithResolvableArgumentsIncludingNonPublic)); container.Register(Reuse.Singleton, Made.Of(FactoryMethod.ConstructorWithResolvableArgumentsIncludingNonPublic)); container.Register(Reuse.Singleton); container.Register(Reuse.Singleton); container.Register(Reuse.Singleton); container.Register(Reuse.Singleton); - container.Register(Reuse.Singleton); container.RegisterMany(new[] { typeof(IPlatformService), typeof(ITwitchService) }, typeof(TwitchService), Reuse.Singleton, Made.Of(FactoryMethod.ConstructorWithResolvableArgumentsIncludingNonPublic)); diff --git a/CatCore/Helpers/IsExternalInit.cs b/CatCore/Helpers/IsExternalInit.cs new file mode 100644 index 00000000..05df8035 --- /dev/null +++ b/CatCore/Helpers/IsExternalInit.cs @@ -0,0 +1,11 @@ +namespace System.Runtime.CompilerServices +{ + /// + /// Polyfill for IsExternalInit to support init accessors in .NET Standard 2.0+ + /// + [System.AttributeUsage(System.AttributeTargets.Class | System.AttributeTargets.Struct | System.AttributeTargets.Field | System.AttributeTargets.Property, AllowMultiple = false, Inherited = false)] + internal sealed class IsExternalInit : System.Attribute + { + } +} + diff --git a/CatCore/Helpers/JSON/TwitchEventSubSerializerContext.cs b/CatCore/Helpers/JSON/TwitchEventSubSerializerContext.cs new file mode 100644 index 00000000..da7f0a2a --- /dev/null +++ b/CatCore/Helpers/JSON/TwitchEventSubSerializerContext.cs @@ -0,0 +1,44 @@ +using System.Text.Json.Serialization; +using CatCore.Models.Twitch.EventSub; + +namespace CatCore.Helpers.JSON +{ + [JsonSerializable(typeof(EventSubWebSocketMessage))] + [JsonSerializable(typeof(EventSubSessionPayload))] + [JsonSerializable(typeof(EventSubSessionInfo))] + [JsonSerializable(typeof(EventSubWebSocketMessage))] + [JsonSerializable(typeof(EventSubChatMessagePayload))] + [JsonSerializable(typeof(EventSubSubscription))] + [JsonSerializable(typeof(EventSubChatMessageEvent))] + [JsonSerializable(typeof(EventSubChatMessageContent))] + [JsonSerializable(typeof(EventSubFragment))] + [JsonSerializable(typeof(EventSubEmoteFragment))] + [JsonSerializable(typeof(EventSubCheermoteFragment))] + [JsonSerializable(typeof(EventSubMentionFragment))] + [JsonSerializable(typeof(EventSubBadge))] + [JsonSerializable(typeof(EventSubCheer))] + [JsonSerializable(typeof(EventSubReply))] + [JsonSerializable(typeof(EventSubWebSocketMessage))] + [JsonSerializable(typeof(EventSubChatNotificationPayload))] + [JsonSerializable(typeof(EventSubChatNotificationEvent))] + [JsonSerializable(typeof(EventSubNotificationSub))] + [JsonSerializable(typeof(EventSubNotificationResub))] + [JsonSerializable(typeof(EventSubNotificationGiftSub))] + [JsonSerializable(typeof(EventSubNotificationRaid))] + [JsonSerializable(typeof(EventSubNotificationUnraid))] + [JsonSerializable(typeof(EventSubNotificationPayItForward))] + [JsonSerializable(typeof(EventSubNotificationAnnouncement))] + [JsonSerializable(typeof(EventSubNotificationBitsBadgeTier))] + [JsonSerializable(typeof(EventSubWebSocketMessage))] + [JsonSerializable(typeof(EventSubChatMessageDeletePayload))] + [JsonSerializable(typeof(EventSubChatMessageDeleteEvent))] + [JsonSerializable(typeof(EventSubWebSocketMessage))] + [JsonSerializable(typeof(EventSubChatClearPayload))] + [JsonSerializable(typeof(EventSubChatClearEvent))] + [JsonSerializable(typeof(EventSubWebSocketMessage))] + [JsonSerializable(typeof(EventSubChatSettingsPayload))] + [JsonSerializable(typeof(EventSubChatSettingsEvent))] + internal partial class TwitchEventSubSerializerContext : JsonSerializerContext + { + } +} diff --git a/CatCore/Helpers/JSON/TwitchHelixSerializerContext.cs b/CatCore/Helpers/JSON/TwitchHelixSerializerContext.cs index a2a289b1..db738bad 100644 --- a/CatCore/Helpers/JSON/TwitchHelixSerializerContext.cs +++ b/CatCore/Helpers/JSON/TwitchHelixSerializerContext.cs @@ -33,6 +33,11 @@ namespace CatCore.Helpers.JSON [JsonSerializable(typeof(HelixRequests.SendChatAnnouncementRequestDto))] [JsonSerializable(typeof(HelixResponses.ResponseBase))] [JsonSerializable(typeof(HelixResponses.ResponseBase))] + [JsonSerializable(typeof(HelixRequests.SendChatMessageRequestDto))] + [JsonSerializable(typeof(HelixRequests.EventSubSubscriptionRequestDto))] + [JsonSerializable(typeof(HelixRequests.EventSubTransportDto))] + [JsonSerializable(typeof(HelixResponses.EventSubSubscriptionResponseDto))] + [JsonSerializable(typeof(HelixResponses.EventSubSubscriptionInfoDto))] internal partial class TwitchHelixSerializerContext : JsonSerializerContext { } diff --git a/CatCore/Models/Twitch/EventSub/EventSubChatClearPayload.cs b/CatCore/Models/Twitch/EventSub/EventSubChatClearPayload.cs new file mode 100644 index 00000000..679a1a23 --- /dev/null +++ b/CatCore/Models/Twitch/EventSub/EventSubChatClearPayload.cs @@ -0,0 +1,34 @@ +using System.Text.Json.Serialization; + +namespace CatCore.Models.Twitch.EventSub +{ + internal struct EventSubChatClearPayload + { + [JsonPropertyName("subscription")] + public EventSubSubscription Subscription { get; init; } + + [JsonPropertyName("event")] + public EventSubChatClearEvent Event { get; init; } + } + + public struct EventSubChatClearEvent + { + [JsonPropertyName("broadcaster_user_id")] + public string BroadcasterUserId { get; init; } + + [JsonPropertyName("broadcaster_user_login")] + public string BroadcasterUserLogin { get; init; } + + [JsonPropertyName("broadcaster_user_name")] + public string BroadcasterUserName { get; init; } + + [JsonPropertyName("moderator_user_id")] + public string ModeratorUserId { get; init; } + + [JsonPropertyName("moderator_user_login")] + public string ModeratorUserLogin { get; init; } + + [JsonPropertyName("moderator_user_name")] + public string ModeratorUserName { get; init; } + } +} diff --git a/CatCore/Models/Twitch/EventSub/EventSubChatMessageDeletePayload.cs b/CatCore/Models/Twitch/EventSub/EventSubChatMessageDeletePayload.cs new file mode 100644 index 00000000..8f8594ee --- /dev/null +++ b/CatCore/Models/Twitch/EventSub/EventSubChatMessageDeletePayload.cs @@ -0,0 +1,37 @@ +using System.Text.Json.Serialization; + +namespace CatCore.Models.Twitch.EventSub +{ + internal struct EventSubChatMessageDeletePayload + { + [JsonPropertyName("subscription")] + public EventSubSubscription Subscription { get; init; } + + [JsonPropertyName("event")] + public EventSubChatMessageDeleteEvent Event { get; init; } + } + + public struct EventSubChatMessageDeleteEvent + { + [JsonPropertyName("broadcaster_user_id")] + public string BroadcasterUserId { get; init; } + + [JsonPropertyName("broadcaster_user_login")] + public string BroadcasterUserLogin { get; init; } + + [JsonPropertyName("broadcaster_user_name")] + public string BroadcasterUserName { get; init; } + + [JsonPropertyName("target_user_id")] + public string TargetUserId { get; init; } + + [JsonPropertyName("target_user_login")] + public string TargetUserLogin { get; init; } + + [JsonPropertyName("target_user_name")] + public string TargetUserName { get; init; } + + [JsonPropertyName("message_id")] + public string MessageId { get; init; } + } +} diff --git a/CatCore/Models/Twitch/EventSub/EventSubChatMessagePayload.cs b/CatCore/Models/Twitch/EventSub/EventSubChatMessagePayload.cs new file mode 100644 index 00000000..bb0e7539 --- /dev/null +++ b/CatCore/Models/Twitch/EventSub/EventSubChatMessagePayload.cs @@ -0,0 +1,196 @@ +using System.Collections.Generic; +using System.Text.Json.Serialization; +using JetBrains.Annotations; + +namespace CatCore.Models.Twitch.EventSub +{ + internal struct EventSubChatMessagePayload + { + [JsonPropertyName("subscription")] + public EventSubSubscription Subscription { get; init; } + + [JsonPropertyName("event")] + public EventSubChatMessageEvent Event { get; init; } + } + + [PublicAPI] + public struct EventSubSubscription + { + [JsonPropertyName("id")] + public string Id { get; init; } + + [JsonPropertyName("type")] + public string Type { get; init; } + + [JsonPropertyName("version")] + public string Version { get; init; } + + [JsonPropertyName("status")] + public string Status { get; init; } + + [JsonPropertyName("condition")] + public Dictionary Condition { get; init; } + + [JsonPropertyName("transport")] + public Dictionary Transport { get; init; } + + [JsonPropertyName("created_at")] + public string CreatedAt { get; init; } + + [JsonPropertyName("cost")] + public int Cost { get; init; } + } + + [PublicAPI] + public struct EventSubChatMessageEvent + { + [JsonPropertyName("broadcaster_user_id")] + public string BroadcasterUserId { get; init; } + + [JsonPropertyName("broadcaster_user_login")] + public string BroadcasterUserLogin { get; init; } + + [JsonPropertyName("broadcaster_user_name")] + public string BroadcasterUserName { get; init; } + + [JsonPropertyName("chatter_user_id")] + public string ChatterUserId { get; init; } + + [JsonPropertyName("chatter_user_login")] + public string ChatterUserLogin { get; init; } + + [JsonPropertyName("chatter_user_name")] + public string ChatterUserName { get; init; } + + [JsonPropertyName("message_id")] + public string MessageId { get; init; } + + [JsonPropertyName("message")] + public EventSubChatMessageContent Message { get; init; } + + [JsonPropertyName("color")] + public string Color { get; init; } + + [JsonPropertyName("badges")] + public List Badges { get; init; } + + [JsonPropertyName("message_type")] + public string MessageType { get; init; } + + [JsonPropertyName("cheer")] + public EventSubCheer? Cheer { get; init; } + + [JsonPropertyName("reply")] + public EventSubReply? Reply { get; init; } + + [JsonPropertyName("channel_points_custom_reward_id")] + public string? ChannelPointsCustomRewardId { get; init; } + } + + [PublicAPI] + public struct EventSubChatMessageContent + { + [JsonPropertyName("text")] + public string Text { get; init; } + + [JsonPropertyName("fragments")] + public List Fragments { get; init; } + } + + [PublicAPI] + public struct EventSubFragment + { + [JsonPropertyName("type")] + public string Type { get; init; } + + [JsonPropertyName("text")] + public string Text { get; init; } + + [JsonPropertyName("emote")] + public EventSubEmoteFragment? Emote { get; init; } + + [JsonPropertyName("cheermote")] + public EventSubCheermoteFragment? Cheermote { get; init; } + + [JsonPropertyName("mention")] + public EventSubMentionFragment? Mention { get; init; } + } + + [PublicAPI] + public struct EventSubEmoteFragment + { + [JsonPropertyName("id")] + public string Id { get; init; } + + [JsonPropertyName("emote_set_id")] + public string EmoteSetId { get; init; } + + [JsonPropertyName("owner_id")] + public string OwnerId { get; init; } + + [JsonPropertyName("format")] + public List Format { get; init; } + } + + [PublicAPI] + public struct EventSubCheermoteFragment + { + [JsonPropertyName("bits")] + public int Bits { get; init; } + + [JsonPropertyName("tier")] + public int Tier { get; init; } + } + + [PublicAPI] + public struct EventSubMentionFragment + { + [JsonPropertyName("user_id")] + public string UserId { get; init; } + + [JsonPropertyName("user_name")] + public string UserName { get; init; } + + [JsonPropertyName("user_login")] + public string UserLogin { get; init; } + } + + [PublicAPI] + public struct EventSubBadge + { + [JsonPropertyName("set_id")] + public string SetId { get; init; } + + [JsonPropertyName("id")] + public string Id { get; init; } + + [JsonPropertyName("info")] + public string Info { get; init; } + } + + [PublicAPI] + public struct EventSubCheer + { + [JsonPropertyName("bits")] + public int Bits { get; init; } + } + + [PublicAPI] + public struct EventSubReply + { + [JsonPropertyName("parent_message_id")] + public string ParentMessageId { get; init; } + + [JsonPropertyName("parent_message_body")] + public string ParentMessageBody { get; init; } + + [JsonPropertyName("parent_user_id")] + public string ParentUserId { get; init; } + + [JsonPropertyName("parent_user_login")] + public string ParentUserLogin { get; init; } + + [JsonPropertyName("parent_user_name")] + public string ParentUserName { get; init; } + } +} diff --git a/CatCore/Models/Twitch/EventSub/EventSubChatNotificationPayload.cs b/CatCore/Models/Twitch/EventSub/EventSubChatNotificationPayload.cs new file mode 100644 index 00000000..dc882a9b --- /dev/null +++ b/CatCore/Models/Twitch/EventSub/EventSubChatNotificationPayload.cs @@ -0,0 +1,175 @@ +using System.Collections.Generic; +using System.Text.Json.Serialization; +using JetBrains.Annotations; + +namespace CatCore.Models.Twitch.EventSub +{ + internal struct EventSubChatNotificationPayload + { + [JsonPropertyName("subscription")] + public EventSubSubscription Subscription { get; init; } + + [JsonPropertyName("event")] + public EventSubChatNotificationEvent Event { get; init; } + } + + [PublicAPI] + public struct EventSubChatNotificationEvent + { + [JsonPropertyName("broadcaster_user_id")] + public string BroadcasterUserId { get; init; } + + [JsonPropertyName("broadcaster_user_login")] + public string BroadcasterUserLogin { get; init; } + + [JsonPropertyName("broadcaster_user_name")] + public string BroadcasterUserName { get; init; } + + [JsonPropertyName("chatter_user_id")] + public string ChatterUserId { get; init; } + + [JsonPropertyName("chatter_user_login")] + public string ChatterUserLogin { get; init; } + + [JsonPropertyName("chatter_user_name")] + public string ChatterUserName { get; init; } + + [JsonPropertyName("notice_type")] + public string NoticeType { get; init; } + + [JsonPropertyName("message")] + public EventSubChatMessageContent Message { get; init; } + + [JsonPropertyName("system_message")] + public string? SystemMessage { get; init; } + + [JsonPropertyName("sub")] + public EventSubNotificationSub? Sub { get; init; } + + [JsonPropertyName("resub")] + public EventSubNotificationResub? Resub { get; init; } + + [JsonPropertyName("gift_sub")] + public EventSubNotificationGiftSub? GiftSub { get; init; } + + [JsonPropertyName("raid")] + public EventSubNotificationRaid? Raid { get; init; } + + [JsonPropertyName("unraid")] + public EventSubNotificationUnraid? Unraid { get; init; } + + [JsonPropertyName("pay_it_forward")] + public EventSubNotificationPayItForward? PayItForward { get; init; } + + [JsonPropertyName("announcement")] + public EventSubNotificationAnnouncement? Announcement { get; init; } + + [JsonPropertyName("bits_badge_tier")] + public EventSubNotificationBitsBadgeTier? BitsBadgeTier { get; init; } + } + + [PublicAPI] + public struct EventSubNotificationSub + { + [JsonPropertyName("is_gift")] + public bool IsGift { get; init; } + + [JsonPropertyName("sub_tier")] + public string SubTier { get; init; } + + [JsonPropertyName("duration_months")] + public int DurationMonths { get; init; } + } + + [PublicAPI] + public struct EventSubNotificationResub + { + [JsonPropertyName("duration_months")] + public int DurationMonths { get; init; } + + [JsonPropertyName("cumulative_months")] + public int CumulativeMonths { get; init; } + + [JsonPropertyName("streak_months")] + public int? StreakMonths { get; init; } + + [JsonPropertyName("is_streak")] + public bool IsStreak { get; init; } + + [JsonPropertyName("sub_tier")] + public string SubTier { get; init; } + } + + [PublicAPI] + public struct EventSubNotificationGiftSub + { + [JsonPropertyName("community_gift_id")] + public string? CommunityGiftId { get; init; } + + [JsonPropertyName("duration_months")] + public int DurationMonths { get; init; } + + [JsonPropertyName("cumulative_total")] + public int? CumulativeTotal { get; init; } + + [JsonPropertyName("recipient_user_id")] + public string RecipientUserId { get; init; } + + [JsonPropertyName("recipient_user_login")] + public string RecipientUserLogin { get; init; } + + [JsonPropertyName("recipient_user_name")] + public string RecipientUserName { get; init; } + + [JsonPropertyName("sub_tier")] + public string SubTier { get; init; } + + [JsonPropertyName("community_gift_ids")] + public List? CommunityGiftIds { get; init; } + } + + [PublicAPI] + public struct EventSubNotificationRaid + { + [JsonPropertyName("user_id")] + public string UserId { get; init; } + + [JsonPropertyName("user_login")] + public string UserLogin { get; init; } + + [JsonPropertyName("user_name")] + public string UserName { get; init; } + + [JsonPropertyName("viewer_count")] + public int ViewerCount { get; init; } + } + + [PublicAPI] + public struct EventSubNotificationUnraid + { + } + + [PublicAPI] + public struct EventSubNotificationPayItForward + { + [JsonPropertyName("is_anonymous")] + public bool IsAnonymous { get; init; } + + [JsonPropertyName("community_gift_id")] + public string? CommunityGiftId { get; init; } + } + + [PublicAPI] + public struct EventSubNotificationAnnouncement + { + [JsonPropertyName("color")] + public string Color { get; init; } + } + + [PublicAPI] + public struct EventSubNotificationBitsBadgeTier + { + [JsonPropertyName("tier")] + public int Tier { get; init; } + } +} diff --git a/CatCore/Models/Twitch/EventSub/EventSubChatSettingsPayload.cs b/CatCore/Models/Twitch/EventSub/EventSubChatSettingsPayload.cs new file mode 100644 index 00000000..4ce30d5b --- /dev/null +++ b/CatCore/Models/Twitch/EventSub/EventSubChatSettingsPayload.cs @@ -0,0 +1,46 @@ +using System.Text.Json.Serialization; + +namespace CatCore.Models.Twitch.EventSub +{ + internal struct EventSubChatSettingsPayload + { + [JsonPropertyName("subscription")] + public EventSubSubscription Subscription { get; init; } + + [JsonPropertyName("event")] + public EventSubChatSettingsEvent Event { get; init; } + } + + public struct EventSubChatSettingsEvent + { + [JsonPropertyName("broadcaster_user_id")] + public string BroadcasterUserId { get; init; } + + [JsonPropertyName("broadcaster_user_login")] + public string BroadcasterUserLogin { get; init; } + + [JsonPropertyName("broadcaster_user_name")] + public string BroadcasterUserName { get; init; } + + [JsonPropertyName("emote_mode")] + public bool EmoteMode { get; init; } + + [JsonPropertyName("follower_mode")] + public bool FollowerMode { get; init; } + + [JsonPropertyName("follower_mode_duration_minutes")] + public int FollowerModeDurationMinutes { get; init; } + + [JsonPropertyName("slow_mode")] + public bool SlowMode { get; init; } + + [JsonPropertyName("slow_mode_wait_seconds")] + public int SlowModeWaitSeconds { get; init; } + + [JsonPropertyName("subscriber_mode")] + public bool SubscriberMode { get; init; } + + [JsonPropertyName("unique_chat_mode")] + public bool UniqueChatMode { get; init; } + } +} diff --git a/CatCore/Models/Twitch/EventSub/EventSubSessionPayload.cs b/CatCore/Models/Twitch/EventSub/EventSubSessionPayload.cs new file mode 100644 index 00000000..2331351b --- /dev/null +++ b/CatCore/Models/Twitch/EventSub/EventSubSessionPayload.cs @@ -0,0 +1,30 @@ +using System.Text.Json.Serialization; +using JetBrains.Annotations; + +namespace CatCore.Models.Twitch.EventSub +{ + internal struct EventSubSessionPayload + { + [JsonPropertyName("session")] + public EventSubSessionInfo Session { get; init; } + } + + [PublicAPI] + public struct EventSubSessionInfo + { + [JsonPropertyName("id")] + public string Id { get; init; } + + [JsonPropertyName("status")] + public string Status { get; init; } + + [JsonPropertyName("keepalive_timeout_seconds")] + public int KeepaliveTimeoutSeconds { get; init; } + + [JsonPropertyName("reconnect_url")] + public string? ReconnectUrl { get; init; } + + [JsonPropertyName("created_at")] + public string CreatedAt { get; init; } + } +} diff --git a/CatCore/Models/Twitch/EventSub/EventSubWebSocketMessage.cs b/CatCore/Models/Twitch/EventSub/EventSubWebSocketMessage.cs new file mode 100644 index 00000000..fa0788d7 --- /dev/null +++ b/CatCore/Models/Twitch/EventSub/EventSubWebSocketMessage.cs @@ -0,0 +1,32 @@ +using System; +using System.Text.Json.Serialization; + +namespace CatCore.Models.Twitch.EventSub +{ + internal struct EventSubMetadata + { + [JsonPropertyName("message_id")] + public string MessageId { get; init; } + + [JsonPropertyName("message_type")] + public string MessageType { get; init; } + + [JsonPropertyName("message_timestamp")] + public DateTime MessageTimestamp { get; init; } + + [JsonPropertyName("subscription_type")] + public string SubscriptionType { get; init; } + + [JsonPropertyName("subscription_version")] + public string SubscriptionVersion { get; init; } + } + + internal struct EventSubWebSocketMessage where TPayload : struct + { + [JsonPropertyName("metadata")] + public EventSubMetadata Metadata { get; init; } + + [JsonPropertyName("payload")] + public TPayload Payload { get; init; } + } +} diff --git a/CatCore/Models/Twitch/Helix/Requests/EventSubRequestDtos.cs b/CatCore/Models/Twitch/Helix/Requests/EventSubRequestDtos.cs new file mode 100644 index 00000000..a5fd7c78 --- /dev/null +++ b/CatCore/Models/Twitch/Helix/Requests/EventSubRequestDtos.cs @@ -0,0 +1,50 @@ +using System.Collections.Generic; +using System.Text.Json.Serialization; +using JetBrains.Annotations; + +namespace CatCore.Models.Twitch.Helix.Requests +{ + [PublicAPI] + internal struct SendChatMessageRequestDto + { + [JsonPropertyName("broadcaster_id")] + public string BroadcasterId { get; init; } + + [JsonPropertyName("sender_id")] + public string SenderId { get; init; } + + [JsonPropertyName("message")] + public string Message { get; init; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("reply_parent_message_id")] + public string? ReplyParentMessageId { get; init; } + } + + [PublicAPI] + internal struct EventSubSubscriptionRequestDto + { + [JsonPropertyName("type")] + public string Type { get; init; } + + [JsonPropertyName("version")] + public string Version { get; init; } + + [JsonPropertyName("condition")] + public Dictionary Condition { get; init; } + + [JsonPropertyName("transport")] + public EventSubTransportDto Transport { get; init; } + } + + [PublicAPI] + internal struct EventSubTransportDto + { + [JsonPropertyName("method")] + public string Method { get; init; } + + [JsonPropertyName("session_id")] + public string SessionId { get; init; } + } +} + diff --git a/CatCore/Models/Twitch/Helix/Requests/Polls/EndPollRequestDto.cs b/CatCore/Models/Twitch/Helix/Requests/Polls/EndPollRequestDto.cs index 9df13d72..26339334 100644 --- a/CatCore/Models/Twitch/Helix/Requests/Polls/EndPollRequestDto.cs +++ b/CatCore/Models/Twitch/Helix/Requests/Polls/EndPollRequestDto.cs @@ -1,4 +1,4 @@ -using System.Text.Json.Serialization; +using System.Text.Json.Serialization; using CatCore.Helpers.Converters; using CatCore.Models.Twitch.Shared; @@ -13,7 +13,7 @@ internal readonly struct EndPollRequestDto public string PollId { get; } [JsonPropertyName("status")] - [JsonConverter(typeof(JsonStringEnumConverter))] + [JsonConverter(typeof(Helpers.Converters.JsonStringEnumConverter))] public PollStatus Status { get; } public EndPollRequestDto(string broadcasterId, string pollId, PollStatus status) diff --git a/CatCore/Models/Twitch/Helix/Requests/Predictions/EndPollRequestDto.cs b/CatCore/Models/Twitch/Helix/Requests/Predictions/EndPollRequestDto.cs index 1a6a701b..4772d175 100644 --- a/CatCore/Models/Twitch/Helix/Requests/Predictions/EndPollRequestDto.cs +++ b/CatCore/Models/Twitch/Helix/Requests/Predictions/EndPollRequestDto.cs @@ -1,4 +1,4 @@ -using System.Text.Json.Serialization; +using System.Text.Json.Serialization; using CatCore.Helpers.Converters; using CatCore.Models.Twitch.Shared; @@ -13,7 +13,7 @@ internal readonly struct EndPredictionRequestDto public string PollId { get; } [JsonPropertyName("status")] - [JsonConverter(typeof(JsonStringEnumConverter))] + [JsonConverter(typeof(Helpers.Converters.JsonStringEnumConverter))] public PredictionStatus Status { get; } [JsonPropertyName("winning_outcome_id")] diff --git a/CatCore/Models/Twitch/Helix/Responses/Bits/Cheermotes/CheermoteGroupData.cs b/CatCore/Models/Twitch/Helix/Responses/Bits/Cheermotes/CheermoteGroupData.cs index 5d566c57..cf7150d4 100644 --- a/CatCore/Models/Twitch/Helix/Responses/Bits/Cheermotes/CheermoteGroupData.cs +++ b/CatCore/Models/Twitch/Helix/Responses/Bits/Cheermotes/CheermoteGroupData.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Text.Json.Serialization; using CatCore.Helpers.Converters; @@ -14,7 +14,7 @@ public readonly struct CheermoteGroupData public IReadOnlyList Tiers { get; } [JsonPropertyName("type")] - [JsonConverter(typeof(JsonStringEnumConverter))] + [JsonConverter(typeof(Helpers.Converters.JsonStringEnumConverter))] public CheermoteType Type { get; } [JsonPropertyName("order")] diff --git a/CatCore/Models/Twitch/Helix/Responses/EventSubResponseDtos.cs b/CatCore/Models/Twitch/Helix/Responses/EventSubResponseDtos.cs new file mode 100644 index 00000000..e3d523d7 --- /dev/null +++ b/CatCore/Models/Twitch/Helix/Responses/EventSubResponseDtos.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Generic; +using System.Text.Json.Serialization; +using JetBrains.Annotations; + +namespace CatCore.Models.Twitch.Helix.Responses +{ + [PublicAPI] + internal readonly struct EventSubSubscriptionResponseDto + { + [JsonConstructor] + public EventSubSubscriptionResponseDto( + List data, + int total, + int totalCost, + int maxTotalCost, + Dictionary? pagination) + { + Data = data ?? new List(); + Total = total; + TotalCost = totalCost; + MaxTotalCost = maxTotalCost; + Pagination = pagination; + } + + [JsonPropertyName("data")] + public List Data { get; } + + [JsonPropertyName("total")] + public int Total { get; } + + [JsonPropertyName("total_cost")] + public int TotalCost { get; } + + [JsonPropertyName("max_total_cost")] + public int MaxTotalCost { get; } + + [JsonPropertyName("pagination")] + public Dictionary? Pagination { get; } + } + + [PublicAPI] + internal readonly struct EventSubSubscriptionInfoDto + { + [JsonConstructor] + public EventSubSubscriptionInfoDto( + string id, + string type, + string version, + string status, + Dictionary condition, + Dictionary transport, + DateTime createdAt, + int cost) + { + Id = id; + Type = type; + Version = version; + Status = status; + Condition = condition ?? new Dictionary(); + Transport = transport ?? new Dictionary(); + CreatedAt = createdAt; + Cost = cost; + } + + [JsonPropertyName("id")] + public string Id { get; } + + [JsonPropertyName("type")] + public string Type { get; } + + [JsonPropertyName("version")] + public string Version { get; } + + [JsonPropertyName("status")] + public string Status { get; } + + [JsonPropertyName("condition")] + public Dictionary Condition { get; } + + [JsonPropertyName("transport")] + public Dictionary Transport { get; } + + [JsonPropertyName("created_at")] + public DateTime CreatedAt { get; } + + [JsonPropertyName("cost")] + public int Cost { get; } + } +} diff --git a/CatCore/Models/Twitch/Helix/Responses/Polls/PollData.cs b/CatCore/Models/Twitch/Helix/Responses/Polls/PollData.cs index 3f83dd5c..2c191fe1 100644 --- a/CatCore/Models/Twitch/Helix/Responses/Polls/PollData.cs +++ b/CatCore/Models/Twitch/Helix/Responses/Polls/PollData.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Text.Json.Serialization; using CatCore.Helpers.Converters; @@ -39,7 +39,7 @@ public readonly struct PollData public uint ChannelPointsPerVote { get; } [JsonPropertyName("status")] - [JsonConverter(typeof(JsonStringEnumConverter))] + [JsonConverter(typeof(Helpers.Converters.JsonStringEnumConverter))] public PollStatus Status { get; } [JsonPropertyName("duration")] diff --git a/CatCore/Models/Twitch/Helix/Responses/Predictions/PredictionData.cs b/CatCore/Models/Twitch/Helix/Responses/Predictions/PredictionData.cs index d859cfe4..93f45695 100644 --- a/CatCore/Models/Twitch/Helix/Responses/Predictions/PredictionData.cs +++ b/CatCore/Models/Twitch/Helix/Responses/Predictions/PredictionData.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Text.Json.Serialization; using CatCore.Helpers.Converters; @@ -33,7 +33,7 @@ public readonly struct PredictionData public uint Duration { get; } [JsonPropertyName("status")] - [JsonConverter(typeof(JsonStringEnumConverter))] + [JsonConverter(typeof(Helpers.Converters.JsonStringEnumConverter))] public PredictionStatus Status { get; } [JsonPropertyName("created_at")] diff --git a/CatCore/Models/Twitch/IRC/IrcMessageTags.cs b/CatCore/Models/Twitch/IRC/IrcMessageTags.cs index 985bece3..dd693f2c 100644 --- a/CatCore/Models/Twitch/IRC/IrcMessageTags.cs +++ b/CatCore/Models/Twitch/IRC/IrcMessageTags.cs @@ -54,6 +54,7 @@ public static class IrcMessageTags public const string MSG_PARAM_SUB_PLAN = "msg-param-sub-plan"; public const string MSG_PARAM_SUB_PLAN_NAME = "msg-param-sub-plan-name"; public const string MSG_PARAM_VIEWER_COUNT = "msg-param-viewerCount"; + public const string MSG_PARAM_PROFILE_IMAGE_URL = "msg-param-profileImageURL"; public const string MSG_PARAM_RITUAL_NAME = "msg-param-ritual-name"; public const string MSG_PARAM_THRESHOLD = "msg-param-threshold"; public const string MSG_PARAM_GIFT_MONTHS = "msg-param-gift-months"; diff --git a/CatCore/Models/Twitch/PubSub/Responses/Polls/PollData.cs b/CatCore/Models/Twitch/PubSub/Responses/Polls/PollData.cs index 34b1b641..f755a35c 100644 --- a/CatCore/Models/Twitch/PubSub/Responses/Polls/PollData.cs +++ b/CatCore/Models/Twitch/PubSub/Responses/Polls/PollData.cs @@ -1,4 +1,4 @@ -using System.Collections.Generic; +using System.Collections.Generic; using System.Text.Json.Serialization; using CatCore.Helpers.Converters; using CatCore.Models.Twitch.Shared; @@ -35,7 +35,7 @@ public readonly struct PollData public PollSettings Settings { get; } [JsonPropertyName("status")] - [JsonConverter(typeof(JsonStringEnumConverter))] + [JsonConverter(typeof(Helpers.Converters.JsonStringEnumConverter))] public PollStatus Status { get; } [JsonPropertyName("choices")] diff --git a/CatCore/Models/Twitch/PubSub/Responses/Predictions/PredictionData.cs b/CatCore/Models/Twitch/PubSub/Responses/Predictions/PredictionData.cs index ef41e4dc..a2f9592c 100644 --- a/CatCore/Models/Twitch/PubSub/Responses/Predictions/PredictionData.cs +++ b/CatCore/Models/Twitch/PubSub/Responses/Predictions/PredictionData.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Text.Json.Serialization; using CatCore.Helpers.Converters; @@ -42,7 +42,7 @@ public readonly struct PredictionData public uint PredictionWindowSeconds { get; } [JsonPropertyName("status")] - [JsonConverter(typeof(JsonStringEnumConverter))] + [JsonConverter(typeof(Helpers.Converters.JsonStringEnumConverter))] public PredictionStatus Status { get; } [JsonPropertyName("winning_outcome_id")] diff --git a/CatCore/Services/KittenPathProvider.cs b/CatCore/Services/KittenPathProvider.cs index 96e5db26..5cb2c783 100644 --- a/CatCore/Services/KittenPathProvider.cs +++ b/CatCore/Services/KittenPathProvider.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.IO; using CatCore.Services.Interfaces; diff --git a/CatCore/Services/KittenWebSocketProvider.cs b/CatCore/Services/KittenWebSocketProvider.cs index e05d1028..8f9678a5 100644 --- a/CatCore/Services/KittenWebSocketProvider.cs +++ b/CatCore/Services/KittenWebSocketProvider.cs @@ -86,7 +86,7 @@ or ConnectionStatus.ForcefullyDisconnected or ConnectionStatus.Aborted or ConnectionStatus.ConnectionFailed or ConnectionStatus.Close) - .Do(_ => tcs.SetResult(null!)) + .Do(_ => tcs.TrySetResult(null!)) .Subscribe(); _disconnectObservable = _websocketConnectionSubject .Where(tuple => tuple.state is ConnectionStatus.Disconnected diff --git a/CatCore/Services/Twitch/Interfaces/ITwitchAuthService.cs b/CatCore/Services/Twitch/Interfaces/ITwitchAuthService.cs index 56125f81..5c67ced6 100644 --- a/CatCore/Services/Twitch/Interfaces/ITwitchAuthService.cs +++ b/CatCore/Services/Twitch/Interfaces/ITwitchAuthService.cs @@ -15,6 +15,7 @@ internal interface ITwitchAuthService event Action? OnCredentialsChanged; event Action? OnAuthenticationStatusChanged; + void Initialize(); ValidationResponse? FetchLoggedInUserInfo(); Task FetchLoggedInUserInfoWithRefresh(); diff --git a/CatCore/Services/Twitch/Interfaces/ITwitchPubSubServiceManager.cs b/CatCore/Services/Twitch/Interfaces/ITwitchPubSubServiceManager.cs index 9f1d262b..ac9c6753 100644 --- a/CatCore/Services/Twitch/Interfaces/ITwitchPubSubServiceManager.cs +++ b/CatCore/Services/Twitch/Interfaces/ITwitchPubSubServiceManager.cs @@ -18,6 +18,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the view count update. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnViewCountUpdated; /// @@ -25,6 +26,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the StreamUp event. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnStreamUp; /// @@ -32,6 +34,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the StreamDown event. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnStreamDown; /// @@ -39,6 +42,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the OnCommercial event. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnCommercial; /// @@ -46,6 +50,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the new follower. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnFollow; /// @@ -53,6 +58,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the poll. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnPoll; /// @@ -60,6 +66,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the prediction. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnPrediction; /// @@ -67,6 +74,7 @@ public interface ITwitchPubSubServiceManager /// First argument of the callback is the channelId on which the event was triggered. /// Second argument of the callback is additional data regarding the redeemed reward and redeemer. /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] event Action OnRewardRedeemed; } } \ No newline at end of file diff --git a/CatCore/Services/Twitch/Interfaces/ITwitchService.cs b/CatCore/Services/Twitch/Interfaces/ITwitchService.cs index 716bad34..66884d01 100644 --- a/CatCore/Services/Twitch/Interfaces/ITwitchService.cs +++ b/CatCore/Services/Twitch/Interfaces/ITwitchService.cs @@ -1,3 +1,4 @@ +using System; using CatCore.Models.Twitch; using CatCore.Models.Twitch.IRC; using CatCore.Services.Interfaces; @@ -12,6 +13,7 @@ public interface ITwitchService : IPlatformService /// Returns the PubSub service manager [PublicAPI] + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] ITwitchPubSubServiceManager GetPubSubService(); /// diff --git a/CatCore/Services/Twitch/Media/TwitchEmoteDetectionHelper.cs b/CatCore/Services/Twitch/Media/TwitchEmoteDetectionHelper.cs index 97fceb42..a68fe55a 100644 --- a/CatCore/Services/Twitch/Media/TwitchEmoteDetectionHelper.cs +++ b/CatCore/Services/Twitch/Media/TwitchEmoteDetectionHelper.cs @@ -1,7 +1,9 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Text; using CatCore.Models.Shared; +using CatCore.Models.Twitch.EventSub; using CatCore.Models.Twitch.IRC; using CatCore.Models.Twitch.Media; using CatCore.Services.Interfaces; @@ -152,6 +154,120 @@ void ExtractOtherEmotesInternal(int messageStartIndex, int messageEndIndex) ExtractOtherEmotesInternal(loopStartIndex, message.Length); } + /// + /// Extracts emote information from EventSub chat message fragments. + /// + /// The message text + /// EventSub message fragments + /// The channel ID + /// Number of bits (for cheermotes) + /// List of extracted emotes + public List ExtractEmoteInfoFromFragments(string message, List fragments, string channelId, uint bits) + { + var emotes = new List(); + + if (fragments == null) + { + return emotes; + } + + var twitchConfig = _settingsService.Config.TwitchConfig; + + // Extract Twitch emotes from fragments + if (twitchConfig.ParseTwitchEmotes) + { + var emoteSearchOffset = 0; + foreach (var fragment in fragments) + { + if (fragment.Type == "emote" && fragment.Emote.HasValue) + { + var emoteFragment = fragment.Emote.Value; + var prefixedEmoteId = "TwitchEmote_" + emoteFragment.Id; + + // Find the position of this emote text in the message, starting from the current offset + // to correctly handle repeated emote text appearing multiple times. + var startIndex = message.IndexOf(fragment.Text, emoteSearchOffset, StringComparison.Ordinal); + if (startIndex >= 0) + { + var endIndex = startIndex + fragment.Text.Length - 1; + var emoteUrl = $"https://static-cdn.jtvnw.net/emoticons/v2/{emoteFragment.Id}/static/dark/3.0"; + emotes.Add(new TwitchEmote(prefixedEmoteId, fragment.Text, startIndex, endIndex, emoteUrl)); + emoteSearchOffset = endIndex + 1; + } + } + else + { + // Advance the offset past non-emote fragments to keep position tracking accurate. + var fragStart = message.IndexOf(fragment.Text, emoteSearchOffset, StringComparison.Ordinal); + if (fragStart >= 0) + { + emoteSearchOffset = fragStart + fragment.Text.Length; + } + } + } + } + + // Extract emojis + if (_settingsService.Config.GlobalConfig.HandleEmojis) + { + ExtractEmojis(emotes, message); + } + + // Extract cheermotes and custom emotes from non-emote fragments + if (twitchConfig.ParseCheermotes && bits > 0 || twitchConfig.ParseBttvEmotes || twitchConfig.ParseFfzEmotes) + { + var fragmentSearchOffset = 0; + foreach (var fragment in fragments) + { + var fragStart = message.IndexOf(fragment.Text, fragmentSearchOffset, StringComparison.Ordinal); + if (fragStart >= 0) + { + fragmentSearchOffset = fragStart + fragment.Text.Length; + } + + if (fragment.Type == "text") + { + ExtractOtherEmotesFromText(emotes, fragment.Text, message, channelId, twitchConfig.ParseCheermotes && bits > 0, twitchConfig.ParseBttvEmotes || twitchConfig.ParseFfzEmotes, fragStart >= 0 ? fragStart : 0); + } + } + } + + return emotes; + } + + private void ExtractOtherEmotesFromText(List emotes, string fragmentText, string fullMessage, string channelId, bool parseCheermotes, bool parseCustomEmotes, int startOffset = 0) + { + if (!parseCheermotes && !parseCustomEmotes) + { + return; + } + + var words = fragmentText.Split(new[] { ' ', '\t', '\n', '\r' }, System.StringSplitOptions.RemoveEmptyEntries); + var searchStartIndex = startOffset; + + foreach (var word in words) + { + var wordStartIndex = fullMessage.IndexOf(word, searchStartIndex, StringComparison.Ordinal); + if (wordStartIndex < 0) + { + continue; + } + + var wordEndIndex = wordStartIndex + word.Length - 1; + + if (parseCustomEmotes && _twitchMediaDataProvider.TryGetThirdPartyEmote(word, channelId, out var customEmote)) + { + emotes.Add(new TwitchEmote(customEmote!.Id, customEmote.Name, wordStartIndex, wordEndIndex, customEmote.Url, customEmote.IsAnimated)); + } + else if (parseCheermotes && _twitchMediaDataProvider.TryGetCheermote(word, channelId, out var emoteBits, out var cheermoteData)) + { + emotes.Add(new TwitchEmote(cheermoteData!.Id, cheermoteData.Name, wordStartIndex, wordEndIndex, cheermoteData.Url, cheermoteData.IsAnimated, emoteBits, cheermoteData.Color)); + } + + searchStartIndex = wordEndIndex + 1; + } + } + /// /// This helper is needed to calculate the correct offset for twitch emote indices when there are preceding surrogate pairs in the message, /// as the Twitch treats a surrogate pair as a single character. diff --git a/CatCore/Services/Twitch/TwitchAuthService.cs b/CatCore/Services/Twitch/TwitchAuthService.cs index fc7c14d4..c8f9336e 100644 --- a/CatCore/Services/Twitch/TwitchAuthService.cs +++ b/CatCore/Services/Twitch/TwitchAuthService.cs @@ -1,7 +1,8 @@ -using System; +using System; using System.Net.Http; using System.Net.Http.Headers; using System.Net.Http.Json; +using System.Linq; using System.Runtime.CompilerServices; using System.Text.Json; using System.Threading; @@ -22,6 +23,9 @@ internal sealed class TwitchAuthService : KittenCredentialsProvider _exceptionRetryPolicy; @@ -33,17 +37,23 @@ internal sealed class TwitchAuthService : KittenCredentialsProvider SERVICE_TYPE; @@ -111,7 +128,40 @@ public TwitchAuthService(ILogger logger, IKittenPathProvider kittenPathProvider, _exceptionRetryPolicy = Policy .Handle() - .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromMilliseconds(2 ^ (retryAttempt - 1) * 500)); + .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromMilliseconds((1 << (retryAttempt - 1)) * 500)); + } + + /// + /// Initializes the Twitch authentication service and starts the background validation loop. + /// + /// + /// This method is idempotent and can be safely called multiple times. On the first call it + /// creates the internal cancellation token source and starts the validation loop that keeps + /// authentication state up to date. Subsequent calls are ignored once initialization has + /// completed. Call this during application startup or when the ITwitchAuthService + /// instance is first created to ensure that token validation runs in the background. + /// + public void Initialize() + { + lock (_validationLoopStateLock) + { + if (_isInitialized) + { + return; + } + + _isInitialized = true; + _validationCancellationTokenSource = new CancellationTokenSource(); + var validationTask = Task.Run(() => RunValidationLoop(_validationCancellationTokenSource.Token)); + _validationTask = validationTask.ContinueWith(t => + { + // Ensure any unhandled exceptions from the validation loop are observed and logged + if (t.IsFaulted && t.Exception != null) + { + _logger.Error(t.Exception, "Scheduled Twitch token validation loop encountered an unhandled exception"); + } + }, TaskContinuationOptions.OnlyOnFaulted); + } } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -120,6 +170,117 @@ public TwitchAuthService(ILogger logger, IKittenPathProvider kittenPathProvider, return _loggedInUser; } + private async Task RunValidationLoop(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + await ValidateCurrentSession().ConfigureAwait(false); + await Task.Delay(VALIDATION_INTERVAL, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + _logger.Warning(ex, "Scheduled Twitch token validation loop failed"); + await Task.Delay(5000, cancellationToken).ConfigureAwait(false); + } + } + } + + private async Task ValidateCurrentSession() + { + if (!HasTokens) + { + CancelForcedRefresh(); + return; + } + + try + { + _logger.Information("Running scheduled Twitch token validation"); + await FetchLoggedInUserInfoWithRefresh().ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.Warning(ex, "Scheduled Twitch token validation failed"); + ScheduleForcedRefresh(FAILED_REFRESH_RETRY_DELAY); + } + } + + private void ScheduleForcedRefresh(TimeSpan? delayOverride = null) + { + if (string.IsNullOrWhiteSpace(Credentials.RefreshToken) || Credentials.ValidUntil == null) + { + CancelForcedRefresh(); + return; + } + + var delay = delayOverride ?? (Credentials.ValidUntil.Value - DateTimeOffset.Now - FORCED_REFRESH_OFFSET); + if (delay < TimeSpan.Zero) + { + delay = TimeSpan.Zero; + } + + CancellationTokenSource? previousCancellationTokenSource; + CancellationTokenSource cancellationTokenSource; + lock (_forcedRefreshStateLock) + { + previousCancellationTokenSource = _forcedRefreshCancellationTokenSource; + cancellationTokenSource = new CancellationTokenSource(); + _forcedRefreshCancellationTokenSource = cancellationTokenSource; + _forcedRefreshTask = Task.Run(() => RunForcedRefreshSchedule(delay, cancellationTokenSource.Token)); + } + + previousCancellationTokenSource?.Cancel(); + previousCancellationTokenSource?.Dispose(); + + _logger.Information("Scheduled forced Twitch token refresh in {Delay}", delay.ToString("g")); + } + + private void CancelForcedRefresh() + { + CancellationTokenSource? cancellationTokenSource; + lock (_forcedRefreshStateLock) + { + cancellationTokenSource = _forcedRefreshCancellationTokenSource; + _forcedRefreshCancellationTokenSource = null; + _forcedRefreshTask = null; + } + + if (cancellationTokenSource == null) + { + return; + } + + cancellationTokenSource.Cancel(); + cancellationTokenSource.Dispose(); + } + + private async Task RunForcedRefreshSchedule(TimeSpan delay, CancellationToken cancellationToken) + { + try + { + if (delay > TimeSpan.Zero) + { + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + } + + _logger.Information("Running scheduled Twitch token refresh"); + if (!await RefreshTokens().ConfigureAwait(false) && !cancellationToken.IsCancellationRequested) + { + ScheduleForcedRefresh(FAILED_REFRESH_RETRY_DELAY); + } + } + catch (OperationCanceledException) + { + // Expected when rescheduling or clearing the current forced refresh task + } + } + // ReSharper disable once CognitiveComplexity public async Task FetchLoggedInUserInfoWithRefresh() { @@ -145,7 +306,10 @@ public TwitchAuthService(ILogger logger, IKittenPathProvider kittenPathProvider, try { var validateAccessToken = await ValidateAccessToken(Credentials, false).ConfigureAwait(false); - _logger.Information("Validated token: Is valid: {IsValid}, Is refreshable: {IsRefreshable}", validateAccessToken != null && TokenIsValid, Credentials.RefreshToken != null); + _logger.Information("Validated token: Is valid: {IsValid}, Is refreshable: {IsRefreshable}, Scopes: {Scopes}", + validateAccessToken != null && TokenIsValid, + Credentials.RefreshToken != null, + validateAccessToken?.Scopes == null ? "" : string.Join(",", validateAccessToken.Value.Scopes)); if (validateAccessToken == null || !TokenIsValid) { _logger.Information("Refreshing tokens"); @@ -189,10 +353,17 @@ public async Task GetTokensByAuthorizationCode(string authorizationCode, string if (!responseMessage.IsSuccessStatusCode) { + _logger.Warning($"Exchanging authorization code for credentials resulted in non-success status code: {responseMessage.StatusCode}"); return; } +#if DEBUG + var contentString = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); + _logger.Debug("Twitch authorization response payload: {Payload}", contentString); + var authorizationResponse = JsonSerializer.Deserialize(contentString, TwitchAuthSerializerContext.Default.AuthorizationResponse); +#else var authorizationResponse = await responseMessage.Content.ReadFromJsonAsync(TwitchAuthSerializerContext.Default.AuthorizationResponse).ConfigureAwait(false); +#endif var newCredentials = new TwitchCredentials(authorizationResponse); await ValidateAccessToken(newCredentials).ConfigureAwait(false); @@ -229,6 +400,7 @@ public async Task GetTokensByAuthorizationCode(string authorizationCode, string { UpdateCredentials(TwitchCredentials.Empty()); _loggedInUser = null; + CancelForcedRefresh(); } Status = AuthenticationStatus.Unauthorized; @@ -237,11 +409,43 @@ public async Task GetTokensByAuthorizationCode(string authorizationCode, string } var validationResponse = await responseMessage.Content.ReadFromJsonAsync(TwitchAuthSerializerContext.Default.ValidationResponse).ConfigureAwait(false); + if (validationResponse.Scopes == null) + { + if (resetDataOnFailure) + { + UpdateCredentials(TwitchCredentials.Empty()); + _loggedInUser = null; + CancelForcedRefresh(); + } + + Status = AuthenticationStatus.Unauthorized; + _logger.Warning("Twitch token validation returned no scope information"); + return null; + } + + var missingScopes = _twitchAuthorizationScope.Where(requiredScope => !validationResponse.Scopes.Contains(requiredScope)).ToArray(); + if (missingScopes.Length > 0) + { + if (resetDataOnFailure) + { + UpdateCredentials(TwitchCredentials.Empty()); + _loggedInUser = null; + CancelForcedRefresh(); + } + + Status = AuthenticationStatus.Unauthorized; + _logger.Warning("Twitch token is missing required authorization scopes. Missing={MissingScopes}; Current={CurrentScopes}", + string.Join(",", missingScopes), + string.Join(",", validationResponse.Scopes)); + return null; + } + _loggedInUser = validationResponse; UpdateCredentials(credentials.ValidUntil!.Value > validationResponse.ExpiresIn ? new TwitchCredentials(credentials.AccessToken, credentials.RefreshToken, validationResponse.ExpiresIn) : credentials); + ScheduleForcedRefresh(); Status = AuthenticationStatus.Authenticated; @@ -264,8 +468,9 @@ public async Task RefreshTokens() _logger.Information("Refreshing tokens using secure CatCore auth back-end"); try { + var encodedRefreshToken = Uri.EscapeDataString(Credentials.RefreshToken); using var responseMessage = await _exceptionRetryPolicy.ExecuteAsync(() => _catCoreAuthClient - .PostAsync($"{_constants.CatCoreAuthServerUri}api/twitch/refresh?refresh_token={Credentials.RefreshToken}", null)) + .PostAsync($"{_constants.CatCoreAuthServerUri}api/twitch/refresh?refresh_token={encodedRefreshToken}", null)) .ConfigureAwait(false); if (!responseMessage.IsSuccessStatusCode) @@ -279,12 +484,23 @@ public async Task RefreshTokens() var refreshedCredentials = new TwitchCredentials(authorizationResponse); return await ValidateAccessToken(refreshedCredentials).ConfigureAwait(false) != null; } + catch (HttpRequestException ex) + { + _logger.Warning(ex, "An error occurred while trying to refresh tokens"); + return false; + } + catch (TaskCanceledException ex) + { + _logger.Warning(ex, "Refreshing tokens timed out or was canceled"); + return false; + } catch (JsonException ex) { _logger.Warning(ex, "Something went wrong while trying to deserialize the refresh tokens body"); UpdateCredentials(TwitchCredentials.Empty()); _loggedInUser = null; + CancelForcedRefresh(); return false; } @@ -305,6 +521,7 @@ public async Task RevokeTokens() UpdateCredentials(TwitchCredentials.Empty()); _loggedInUser = null; + CancelForcedRefresh(); return responseMessage.IsSuccessStatusCode; } diff --git a/CatCore/Services/Twitch/TwitchEventSubChatService.cs b/CatCore/Services/Twitch/TwitchEventSubChatService.cs new file mode 100644 index 00000000..b54f95b6 --- /dev/null +++ b/CatCore/Services/Twitch/TwitchEventSubChatService.cs @@ -0,0 +1,1532 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using CatCore.Helpers; +using CatCore.Helpers.JSON; +using CatCore.Models.EventArgs; +using CatCore.Models.Shared; +using CatCore.Models.Twitch; +using CatCore.Models.Twitch.EventSub; +using CatCore.Models.Twitch.Helix.Responses; +using CatCore.Models.Twitch.IRC; +using CatCore.Models.Twitch.OAuth; +using CatCore.Models.Twitch.PubSub.Responses; +using CatCore.Models.Twitch.PubSub.Responses.ChannelPointsChannelV1; +using CatCore.Models.Twitch.PubSub.Responses.Polls; +using CatCore.Models.Twitch.PubSub.Responses.Predictions; +using CatCore.Models.Twitch.PubSub.Responses.VideoPlayback; +using CatCore.Models.Twitch.Shared; +using CatCore.Services.Interfaces; +using CatCore.Services.Twitch.Interfaces; +using CatCore.Services.Twitch.Media; +using Serilog; +using PredictionBadge = CatCore.Models.Twitch.PubSub.Responses.Predictions.Badge; +using PredictionUser = CatCore.Models.Twitch.PubSub.Responses.Predictions.User; +using RewardUser = CatCore.Models.Twitch.PubSub.Responses.ChannelPointsChannelV1.User; + +namespace CatCore.Services.Twitch +{ + internal sealed class TwitchEventSubChatService : ITwitchIrcService, ITwitchPubSubServiceManager + { + private const string TWITCH_EVENTSUB_ENDPOINT = "wss://eventsub.wss.twitch.tv/ws"; + + // EventSub subscription types for chat + private const string SUB_TYPE_CHANNEL_CHAT_MESSAGE = "channel.chat.message"; + private const string SUB_TYPE_CHANNEL_CHAT_NOTIFICATION = "channel.chat.notification"; + private const string SUB_TYPE_CHANNEL_CHAT_MESSAGE_DELETE = "channel.chat.message_delete"; + private const string SUB_TYPE_CHANNEL_CHAT_CLEAR = "channel.chat.clear"; + private const string SUB_TYPE_CHANNEL_CHAT_SETTINGS_UPDATE = "channel.chat_settings.update"; + private const string SUB_TYPE_STREAM_ONLINE = "stream.online"; + private const string SUB_TYPE_STREAM_OFFLINE = "stream.offline"; + private const string SUB_TYPE_CHANNEL_AD_BREAK_BEGIN = "channel.ad_break.begin"; + private const string SUB_TYPE_CHANNEL_FOLLOW = "channel.follow"; + private const string SUB_TYPE_CHANNEL_POLL_BEGIN = "channel.poll.begin"; + private const string SUB_TYPE_CHANNEL_POLL_PROGRESS = "channel.poll.progress"; + private const string SUB_TYPE_CHANNEL_POLL_END = "channel.poll.end"; + private const string SUB_TYPE_CHANNEL_PREDICTION_BEGIN = "channel.prediction.begin"; + private const string SUB_TYPE_CHANNEL_PREDICTION_PROGRESS = "channel.prediction.progress"; + private const string SUB_TYPE_CHANNEL_PREDICTION_LOCK = "channel.prediction.lock"; + private const string SUB_TYPE_CHANNEL_PREDICTION_END = "channel.prediction.end"; + private const string SUB_TYPE_CHANNEL_REWARD_REDEEM = "channel.channel_points_custom_reward_redemption.add"; + private const string EVENTSUB_VERSION = "1"; + private const string EVENTSUB_VERSION_FOLLOW = "2"; + private static readonly TimeSpan VIEW_COUNT_POLL_INTERVAL = TimeSpan.FromSeconds(30); + private const int HELIX_USER_IDS_PER_REQUEST_LIMIT = 100; + + private readonly ILogger _logger; + private readonly IKittenWebSocketProvider _kittenWebSocketProvider; + private readonly IKittenPlatformActiveStateManager _activeStateManager; + private readonly ITwitchAuthService _twitchAuthService; + private readonly ITwitchChannelManagementService _twitchChannelManagementService; + private readonly ITwitchRoomStateTrackerService _roomStateTrackerService; + private readonly ITwitchUserStateTrackerService _userStateTrackerService; + private readonly TwitchEmoteDetectionHelper _twitchEmoteDetectionHelper; + private readonly TwitchMediaDataProvider _twitchMediaDataProvider; + private readonly TwitchHelixApiService _twitchHelixApiService; + + private string? _sessionId; + private string? _reconnectUrl; + private ValidationResponse? _loggedInUser; + + // channelId → subscriptionIds[] + private readonly ConcurrentDictionary> _channelSubscriptionIds; + private readonly ConcurrentDictionary, bool> _viewCountCallbackRegistrations = new(); + private readonly ConcurrentDictionary _profileImageUrlCache = new(StringComparer.OrdinalIgnoreCase); + + private readonly SemaphoreSlim _connectionLockerSemaphoreSlim = new(1, 1); + private readonly object _viewCountPollingStateLock = new(); + private CancellationTokenSource? _viewCountPollingCancellationTokenSource; + private Task? _viewCountPollingTask; + + public TwitchEventSubChatService(ILogger logger, IKittenWebSocketProvider kittenWebSocketProvider, IKittenPlatformActiveStateManager activeStateManager, + ITwitchAuthService twitchAuthService, ITwitchChannelManagementService twitchChannelManagementService, ITwitchRoomStateTrackerService roomStateTrackerService, + ITwitchUserStateTrackerService userStateTrackerService, TwitchEmoteDetectionHelper twitchEmoteDetectionHelper, TwitchMediaDataProvider twitchMediaDataProvider, + TwitchHelixApiService twitchHelixApiService) + { + _logger = logger; + _kittenWebSocketProvider = kittenWebSocketProvider; + _activeStateManager = activeStateManager; + _twitchAuthService = twitchAuthService; + _twitchChannelManagementService = twitchChannelManagementService; + _roomStateTrackerService = roomStateTrackerService; + _userStateTrackerService = userStateTrackerService; + _twitchEmoteDetectionHelper = twitchEmoteDetectionHelper; + _twitchMediaDataProvider = twitchMediaDataProvider; + _twitchHelixApiService = twitchHelixApiService; + + _twitchAuthService.OnCredentialsChanged += TwitchAuthServiceOnOnCredentialsChanged; + _twitchChannelManagementService.ChannelsUpdated += TwitchChannelManagementServiceOnChannelsUpdated; + + _channelSubscriptionIds = new ConcurrentDictionary>(); + } + + public event Action? OnChatConnected; + public event Action? OnJoinChannel; + public event Action? OnLeaveChannel; + public event Action? OnRoomStateChanged; + public event Action? OnMessageReceived; + public event Action? OnMessageDeleted; + public event Action? OnChatCleared; + + event Action ITwitchPubSubServiceManager.OnViewCountUpdated + { + add + { + if (_viewCountCallbackRegistrations.TryAdd(value, false)) + { + TryStartViewCountPollingIfNeeded(); + } + else + { + _logger.Warning("Callback was already registered for EventHandler {Name}", nameof(ITwitchPubSubServiceManager.OnViewCountUpdated)); + } + } + remove + { + if (_viewCountCallbackRegistrations.TryRemove(value, out _) && _viewCountCallbackRegistrations.IsEmpty) + { + _ = StopViewCountPollingIfRunning(); + } + } + } + + public event Action? OnStreamUp; + public event Action? OnStreamDown; + public event Action? OnCommercial; + public event Action? OnFollow; + public event Action? OnPoll; + public event Action? OnPrediction; + public event Action? OnRewardRedeemed; + + public void SendMessage(TwitchChannel channel, string message) + { + if (_loggedInUser == null) + { + _logger.Warning("Cannot send message: not logged in"); + return; + } + + _ = Task.Run(async () => + { + var senderUserId = _loggedInUser?.UserId ?? string.Empty; + if (senderUserId.Length == 0) + { + _logger.Warning("Cannot send message: user id is unavailable"); + return; + } + + var success = await _twitchHelixApiService.SendChatMessage(channel.Id, senderUserId, message).ConfigureAwait(false); + if (!success) + { + _logger.Warning("Failed to send message to channel {ChannelId}", channel.Id); + } + }); + } + + Task ITwitchIrcService.Start() + { + _logger.Verbose("Start requested by service manager"); + return StartInternal(); + } + + Task ITwitchIrcService.Stop() + { + _logger.Verbose("Stop requested by service manager"); + return StopInternal(); + } + + Task ITwitchPubSubServiceManager.Start() + { + return StartInternal(); + } + + Task ITwitchPubSubServiceManager.Stop() + { + return StopInternal(); + } + + private async Task StartInternal() + { + using var _ = await Synchronization.LockAsync(_connectionLockerSemaphoreSlim); + if (!_twitchAuthService.HasTokens) + { + return; + } + + _loggedInUser = await _twitchAuthService.FetchLoggedInUserInfoWithRefresh().ConfigureAwait(false); + if (_loggedInUser == null) + { + return; + } + + _kittenWebSocketProvider.ConnectHappened -= ConnectHappenedHandler; + _kittenWebSocketProvider.ConnectHappened += ConnectHappenedHandler; + + _kittenWebSocketProvider.DisconnectHappened -= DisconnectHappenedHandler; + _kittenWebSocketProvider.DisconnectHappened += DisconnectHappenedHandler; + + _kittenWebSocketProvider.MessageReceived -= MessageReceivedHandler; + _kittenWebSocketProvider.MessageReceived += MessageReceivedHandler; + + var connectUrl = _reconnectUrl ?? TWITCH_EVENTSUB_ENDPOINT; + await _kittenWebSocketProvider.Connect(connectUrl).ConfigureAwait(false); + } + + private async Task StopInternal() + { + await StopViewCountPollingIfRunning().ConfigureAwait(false); + + // Unsubscribe from all channels + var channelIds = _channelSubscriptionIds.Keys.ToList(); + foreach (var channelId in channelIds) + { + await UnsubscribeFromChannelInternal(channelId).ConfigureAwait(false); + } + + await _kittenWebSocketProvider.Disconnect().ConfigureAwait(false); + + _kittenWebSocketProvider.ConnectHappened -= ConnectHappenedHandler; + _kittenWebSocketProvider.DisconnectHappened -= DisconnectHappenedHandler; + _kittenWebSocketProvider.MessageReceived -= MessageReceivedHandler; + + _loggedInUser = null; + _sessionId = null; + _reconnectUrl = null; + } + + private async void TwitchAuthServiceOnOnCredentialsChanged() + { + if (_twitchAuthService.HasTokens) + { + if (_activeStateManager.GetState(PlatformType.Twitch)) + { + _logger.Verbose("(Re)start requested by credential changes"); + await StartInternal().ConfigureAwait(false); + } + } + else + { + await StopInternal().ConfigureAwait(false); + } + } + + private void TwitchChannelManagementServiceOnChannelsUpdated(object sender, TwitchChannelsUpdatedEventArgs e) + { + if (_activeStateManager.GetState(PlatformType.Twitch)) + { + foreach (var disabledChannel in e.DisabledChannels) + { + _ = Task.Run(() => UnsubscribeFromChannelInternal(disabledChannel.Key)); + } + + foreach (var enabledChannel in e.EnabledChannels) + { + _ = Task.Run(() => SubscribeToChannelInternal(enabledChannel.Key, enabledChannel.Value)); + } + } + } + + private Task ConnectHappenedHandler(WebSocketConnection webSocketConnection) + { + _logger.Verbose("EventSub WebSocket connect handler triggered"); + return Task.CompletedTask; + } + + private Task DisconnectHappenedHandler() + { + return Task.CompletedTask; + } + + private Task MessageReceivedHandler(WebSocketConnection webSocketConnection, string message) + { + MessageReceivedHandlerInternal(webSocketConnection, message); + return Task.CompletedTask; + } + + private void MessageReceivedHandlerInternal(WebSocketConnection webSocketConnection, string rawMessage) + { +#if DEBUG + _logger.Verbose("EventSub message received: {Message}", rawMessage); +#endif + + try + { + // Parse the top-level message to determine type + using var jsonDocument = JsonDocument.Parse(rawMessage); + var root = jsonDocument.RootElement; + + if (!root.TryGetProperty("metadata", out var metadataElement)) + { + _logger.Warning("Received EventSub message without metadata"); + return; + } + + var metadata = JsonSerializer.Deserialize(metadataElement, TwitchEventSubSerializerContext.Default.EventSubMetadata); + + HandleEventSubMessage(metadata, rawMessage); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to parse EventSub message"); + } + } + + private void HandleEventSubMessage(EventSubMetadata metadata, string rawMessage) + { + switch (metadata.MessageType) + { + case "session_welcome": + HandleSessionWelcome(rawMessage); + break; + case "session_keepalive": + // NOP + break; + case "session_reconnect": + HandleSessionReconnect(rawMessage); + break; + case "notification": + HandleNotification(metadata, rawMessage); + break; + case "revocation": + HandleRevocation(rawMessage); + break; + default: + _logger.Verbose("Received unknown EventSub message type: {MessageType}", metadata.MessageType); + break; + } + } + + private void HandleSessionWelcome(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var payload = JsonSerializer.Deserialize( + jsonDocument.RootElement.GetProperty("payload"), + TwitchEventSubSerializerContext.Default.EventSubSessionPayload); + + _sessionId = payload.Session.Id; + _reconnectUrl = null; + + _logger.Information("EventSub session established. Session ID: {SessionId}", _sessionId); + + OnChatConnected?.Invoke(); + TryStartViewCountPollingIfNeeded(); + + // Subscribe to all active channels + _ = Task.Run(() => SubscribeToAllChannelsInternal()); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle session_welcome"); + } + } + + private void HandleSessionReconnect(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var payload = JsonSerializer.Deserialize( + jsonDocument.RootElement.GetProperty("payload"), + TwitchEventSubSerializerContext.Default.EventSubSessionPayload); + + _reconnectUrl = payload.Session.ReconnectUrl; + + _logger.Information("EventSub session reconnect requested. New URL: {ReconnectUrl}", _reconnectUrl); + + // Connect to the new URL. KittenWebSocketProvider.Connect already calls Disconnect() first, + // so there is no need to explicitly disconnect the old session after StartInternal returns. + _ = Task.Run(async () => + { + try + { + await StartInternal().ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to start new EventSub session during session_reconnect"); + } + }); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle session_reconnect"); + } + } + + private void HandleNotification(EventSubMetadata metadata, string rawMessage) + { + switch (metadata.SubscriptionType) + { + case SUB_TYPE_CHANNEL_CHAT_MESSAGE: + HandleChatMessage(rawMessage); + break; + case SUB_TYPE_CHANNEL_CHAT_NOTIFICATION: + HandleChatNotification(rawMessage); + break; + case SUB_TYPE_CHANNEL_CHAT_MESSAGE_DELETE: + HandleMessageDelete(rawMessage); + break; + case SUB_TYPE_CHANNEL_CHAT_CLEAR: + HandleChatClear(rawMessage); + break; + case SUB_TYPE_CHANNEL_CHAT_SETTINGS_UPDATE: + HandleSettingsUpdate(rawMessage); + break; + case SUB_TYPE_STREAM_ONLINE: + HandleStreamOnline(rawMessage); + break; + case SUB_TYPE_STREAM_OFFLINE: + HandleStreamOffline(rawMessage); + break; + case SUB_TYPE_CHANNEL_AD_BREAK_BEGIN: + HandleCommercial(rawMessage); + break; + case SUB_TYPE_CHANNEL_FOLLOW: + HandleFollow(rawMessage); + break; + case SUB_TYPE_CHANNEL_POLL_BEGIN: + case SUB_TYPE_CHANNEL_POLL_PROGRESS: + case SUB_TYPE_CHANNEL_POLL_END: + HandlePoll(rawMessage); + break; + case SUB_TYPE_CHANNEL_PREDICTION_BEGIN: + case SUB_TYPE_CHANNEL_PREDICTION_PROGRESS: + case SUB_TYPE_CHANNEL_PREDICTION_LOCK: + case SUB_TYPE_CHANNEL_PREDICTION_END: + HandlePrediction(rawMessage); + break; + case SUB_TYPE_CHANNEL_REWARD_REDEEM: + HandleRewardRedeem(rawMessage); + break; + default: + _logger.Verbose("Received unknown subscription type: {SubscriptionType}", metadata.SubscriptionType); + break; + } + } + + private void HandleStreamOnline(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var ev = jsonDocument.RootElement.GetProperty("payload").GetProperty("event"); + if (!TryGetString(ev, "broadcaster_user_id", out var channelId)) + { + return; + } + + var startedAt = TryGetDateTime(ev, "started_at") ?? DateTimeOffset.UtcNow; + OnStreamUp?.Invoke(channelId, new StreamUp(ToLegacyServerTimeRaw(startedAt), 0)); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle stream.online"); + } + } + + private void HandleStreamOffline(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var ev = jsonDocument.RootElement.GetProperty("payload").GetProperty("event"); + if (!TryGetString(ev, "broadcaster_user_id", out var channelId)) + { + return; + } + + OnStreamDown?.Invoke(channelId, new StreamDown(ToLegacyServerTimeRaw(DateTimeOffset.UtcNow))); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle stream.offline"); + } + } + + private void HandleCommercial(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var ev = jsonDocument.RootElement.GetProperty("payload").GetProperty("event"); + if (!TryGetString(ev, "broadcaster_user_id", out var channelId)) + { + return; + } + + var startedAt = TryGetDateTime(ev, "started_at") ?? DateTimeOffset.UtcNow; + OnCommercial?.Invoke(channelId, new Commercial(ToLegacyServerTimeRaw(startedAt), TryGetUInt(ev, "duration_seconds"))); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.ad_break.begin"); + } + } + + private void HandleFollow(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var ev = jsonDocument.RootElement.GetProperty("payload").GetProperty("event"); + if (!TryGetString(ev, "broadcaster_user_id", out var channelId) || + !TryGetString(ev, "user_id", out var userId) || + !TryGetString(ev, "user_login", out var userLogin) || + !TryGetString(ev, "user_name", out var userName)) + { + return; + } + + OnFollow?.Invoke(channelId, new Follow(userId, userLogin, userName)); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.follow"); + } + } + + private void HandlePoll(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var ev = jsonDocument.RootElement.GetProperty("payload").GetProperty("event"); + if (!TryGetString(ev, "broadcaster_user_id", out var channelId)) + { + return; + } + + var choices = new List(); + uint totalVoters = 0; + if (ev.TryGetProperty("choices", out var choicesElement) && choicesElement.ValueKind == JsonValueKind.Array) + { + foreach (var choice in choicesElement.EnumerateArray()) + { + var bitsVotes = TryGetUInt(choice, "bits_votes"); + var channelPointsVotes = TryGetUInt(choice, "channel_points_votes"); + var votes = TryGetUInt(choice, "votes"); + totalVoters += votes; + choices.Add(new PollChoice( + TryGetString(choice, "id", out var choiceId) ? choiceId : string.Empty, + TryGetString(choice, "title", out var choiceTitle) ? choiceTitle : string.Empty, + new Votes(votes, bitsVotes, channelPointsVotes, votes), + new Tokens(bitsVotes, channelPointsVotes), + votes)); + } + } + + var settings = new PollSettings( + new PollSettingsEntry(false, null), + new PollSettingsEntry(false, null), + new PollSettingsEntry(false, null), + new PollSettingsEntry(TryGetBool(ev, "bits_voting_enabled"), TryGetNullableUInt(ev, "bits_per_vote")), + new PollSettingsEntry(TryGetBool(ev, "channel_points_voting_enabled"), TryGetNullableUInt(ev, "channel_points_per_vote"))); + + var pollData = new PollData( + TryGetString(ev, "id", out var pollId) ? pollId : string.Empty, + channelId, + channelId, + TryGetString(ev, "title", out var pollTitle) ? pollTitle : string.Empty, + TryGetString(ev, "started_at", out var startedAtRaw) ? startedAtRaw : string.Empty, + TryGetString(ev, "ended_at", out var endedAtRaw) ? endedAtRaw : string.Empty, + string.Empty, + TryGetUInt(ev, "duration_seconds"), + settings, + ParsePollStatus(TryGetString(ev, "status", out var statusRaw) ? statusRaw : null), + choices, + new Votes(totalVoters, 0, 0, totalVoters), + new Tokens(0, 0), + totalVoters, + 0, + null, + null, + null); + + OnPoll?.Invoke(channelId, pollData); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.poll.*"); + } + } + + private void HandlePrediction(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var ev = jsonDocument.RootElement.GetProperty("payload").GetProperty("event"); + if (!TryGetString(ev, "broadcaster_user_id", out var channelId)) + { + return; + } + + var outcomes = new List(); + if (ev.TryGetProperty("outcomes", out var outcomesElement) && outcomesElement.ValueKind == JsonValueKind.Array) + { + foreach (var outcome in outcomesElement.EnumerateArray()) + { + var topPredictors = new List(); + if (outcome.TryGetProperty("top_predictors", out var topPredictorsElement) && topPredictorsElement.ValueKind == JsonValueKind.Array) + { + foreach (var predictor in topPredictorsElement.EnumerateArray()) + { + var used = TryGetUInt(predictor, "channel_points_used"); + var won = TryGetUInt(predictor, "channel_points_won"); + topPredictors.Add(new TopPredictor( + Guid.NewGuid().ToString("N"), + TryGetString(ev, "id", out var eventIdRaw) ? eventIdRaw : string.Empty, + TryGetString(outcome, "id", out var outcomeIdRaw) ? outcomeIdRaw : string.Empty, + channelId, + used, + DateTime.UtcNow, + DateTime.UtcNow, + TryGetString(predictor, "user_id", out var predictorId) ? predictorId : string.Empty, + new PredictorResult(won > 0 ? "WIN" : "UNKNOWN", won, true), + TryGetString(predictor, "user_name", out var predictorName) ? predictorName : string.Empty)); + } + } + + outcomes.Add(new Outcome( + TryGetString(outcome, "id", out var outcomeId) ? outcomeId : string.Empty, + TryGetString(outcome, "color", out var color) ? color : string.Empty, + TryGetString(outcome, "title", out var outcomeTitle) ? outcomeTitle : string.Empty, + TryGetUInt(outcome, "channel_points"), + TryGetUInt(outcome, "users"), + topPredictors, + new PredictionBadge(string.Empty, string.Empty))); + } + } + + var createdBy = new PredictionUser("user", channelId, TryGetString(ev, "broadcaster_user_name", out var broadcasterName) ? broadcasterName : string.Empty, null); + var endedAt = TryGetDateTime(ev, "ended_at")?.UtcDateTime; + var lockedAt = TryGetDateTime(ev, "locks_at")?.UtcDateTime; + var predictionData = new PredictionData( + TryGetString(ev, "id", out var predictionId) ? predictionId : string.Empty, + channelId, + TryGetString(ev, "title", out var predictionTitle) ? predictionTitle : string.Empty, + (TryGetDateTime(ev, "created_at") ?? DateTimeOffset.UtcNow).UtcDateTime, + createdBy, + endedAt, + endedAt.HasValue ? createdBy : (PredictionUser?)null, + lockedAt, + lockedAt.HasValue ? createdBy : (PredictionUser?)null, + outcomes, + TryGetUInt(ev, "prediction_window_seconds"), + ParsePredictionStatus(TryGetString(ev, "status", out var statusRaw) ? statusRaw : null), + TryGetString(ev, "winning_outcome_id", out var winningOutcomeId) ? winningOutcomeId : null); + + OnPrediction?.Invoke(channelId, predictionData); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.prediction.*"); + } + } + + private void HandleRewardRedeem(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var ev = jsonDocument.RootElement.GetProperty("payload").GetProperty("event"); + if (!TryGetString(ev, "broadcaster_user_id", out var channelId) || !ev.TryGetProperty("reward", out var rewardElement)) + { + return; + } + + var user = new RewardUser( + TryGetString(ev, "user_id", out var userId) ? userId : string.Empty, + TryGetString(ev, "user_login", out var userLogin) ? userLogin : string.Empty, + TryGetString(ev, "user_name", out var userName) ? userName : string.Empty); + + var reward = new Reward( + TryGetString(rewardElement, "id", out var rewardId) ? rewardId : string.Empty, + channelId, + TryGetString(rewardElement, "title", out var rewardTitle) ? rewardTitle : string.Empty, + TryGetString(rewardElement, "prompt", out var prompt) ? prompt : string.Empty, + (int)TryGetUInt(rewardElement, "cost"), + false, + false, + new object(), + new DefaultImage(string.Empty, string.Empty, string.Empty), + "#000000", + true, + false, + true, + new MaxPerStream(false, 0), + false, + string.Empty, + DateTimeOffset.UtcNow, + new MaxPerUserPerStream(false, 0), + new GlobalCooldown(false, 0), + null); + + var data = new RewardRedeemedData( + TryGetString(ev, "id", out var redemptionId) ? redemptionId : string.Empty, + user, + channelId, + TryGetString(ev, "redeemed_at", out var redeemedAtRaw) ? redeemedAtRaw : DateTimeOffset.UtcNow.ToString("O"), + reward, + TryGetString(ev, "status", out var status) ? status : "fulfilled"); + + OnRewardRedeemed?.Invoke(channelId, data); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.channel_points_custom_reward_redemption.add"); + } + } + + private void HandleChatMessage(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var payload = JsonSerializer.Deserialize( + jsonDocument.RootElement.GetProperty("payload"), + TwitchEventSubSerializerContext.Default.EventSubChatMessagePayload); + + var ev = payload.Event; + var channel = new TwitchChannel(this, ev.BroadcasterUserId, ev.BroadcasterUserLogin); + var chatterColor = NormalizeHexColorOrDefault(ev.Color, "#FFFFFF"); + + // Build badges + var badgeEntries = new List(); + foreach (var badge in ev.Badges ?? new List()) + { + var badgeIdentifier = $"{badge.SetId}/{badge.Id}"; + if (_twitchMediaDataProvider.TryGetBadge(badgeIdentifier, ev.BroadcasterUserId, out var badgeObj)) + { + badgeEntries.Add(badgeObj!); + } + } + + // Build user + var user = new TwitchUser( + ev.ChatterUserId, + ev.ChatterUserLogin, + ev.ChatterUserName, + chatterColor, + ev.Badges?.Any(b => b.SetId == "moderator") ?? false, + ev.Badges?.Any(b => b.SetId == "broadcaster") ?? false, + ev.Badges?.Any(b => b.SetId == "subscriber" || b.SetId == "founder") ?? false, + ev.Badges?.Any(b => b.SetId == "turbo") ?? false, + ev.Badges?.Any(b => b.SetId == "vip") ?? false, + badgeEntries.AsReadOnly() + ); + + // Extract emotes + var emotes = _twitchEmoteDetectionHelper.ExtractEmoteInfoFromFragments( + ev.Message.Text, + ev.Message.Fragments, + ev.BroadcasterUserId, + (uint)(ev.Cheer?.Bits ?? 0) + ); + + // Determine if the logged-in user is mentioned + var isMentioned = false; + if (_loggedInUser != null && !string.IsNullOrEmpty(ev.Message.Text)) + { + var mention = "@" + _loggedInUser.Value.LoginName; + isMentioned = ev.Message.Text.IndexOf(mention, StringComparison.OrdinalIgnoreCase) >= 0; + } + + // Build TwitchMessage + var twitchMessage = new TwitchMessage( + ev.MessageId, + false, + ev.MessageType == "ACTION", + isMentioned, + ev.Message.Text, + user, + channel, + emotes.AsReadOnly(), + null, // Metadata (not needed for EventSub) + "PRIVMSG", // IRC type equivalent + (uint)(ev.Cheer?.Bits ?? 0) + ); + + OnMessageReceived?.Invoke(twitchMessage); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.chat.message"); + } + } + + private void HandleChatNotification(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var payload = JsonSerializer.Deserialize( + jsonDocument.RootElement.GetProperty("payload"), + TwitchEventSubSerializerContext.Default.EventSubChatNotificationPayload); + + var ev = payload.Event; + var channel = new TwitchChannel(this, ev.BroadcasterUserId, ev.BroadcasterUserLogin); + var userMessageText = ev.Message.Text ?? string.Empty; + var notificationText = ResolveNotificationText(ev); + var notificationMetadata = BuildLegacyUserNoticeMetadata(ev, notificationText); + if (string.IsNullOrWhiteSpace(notificationText)) + { + _logger.Warning("EventSub notification text resolved empty for notice type {NoticeType}", ev.NoticeType); + } + + // Build user + var user = new TwitchUser( + ev.ChatterUserId, + ev.ChatterUserLogin, + ev.ChatterUserName, + "#ffffff", + false, + false, + false, + false, + false, + new ReadOnlyCollection(new List()) + ); + + // Extract emotes + var emotes = _twitchEmoteDetectionHelper.ExtractEmoteInfoFromFragments( + userMessageText, + ev.Message.Fragments, + ev.BroadcasterUserId, + 0 + ); + + var twitchMessage = new TwitchMessage( + Guid.NewGuid().ToString(), + true, // IsSystemMessage + false, + false, + userMessageText, + user, + channel, + emotes.AsReadOnly(), + notificationMetadata, + "USERNOTICE", + 0 + ); + + OnMessageReceived?.Invoke(twitchMessage); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.chat.notification"); + } + } + + private ReadOnlyDictionary BuildLegacyUserNoticeMetadata(EventSubChatNotificationEvent ev, string notificationText) + { + var metadataLogin = ev.ChatterUserLogin; + var metadataDisplayName = ev.ChatterUserName; + var profileLookupUserId = ev.ChatterUserId; + + if (ev.Raid.HasValue) + { + if (!string.IsNullOrWhiteSpace(ev.Raid.Value.UserLogin)) + { + metadataLogin = ev.Raid.Value.UserLogin; + } + + if (!string.IsNullOrWhiteSpace(ev.Raid.Value.UserName)) + { + metadataDisplayName = ev.Raid.Value.UserName; + } + + if (!string.IsNullOrWhiteSpace(ev.Raid.Value.UserId)) + { + profileLookupUserId = ev.Raid.Value.UserId; + } + } + + var metadata = new Dictionary(StringComparer.Ordinal) + { + [IrcMessageTags.MSG_ID] = string.IsNullOrWhiteSpace(ev.NoticeType) ? "usernotice" : ev.NoticeType, + [IrcMessageTags.SYSTEM_MSG] = EscapeLegacyIrcSystemMessage(notificationText) + }; + + if (!string.IsNullOrWhiteSpace(metadataLogin)) + { + metadata[IrcMessageTags.MSG_PARAM_LOGIN] = metadataLogin; + } + + if (!string.IsNullOrWhiteSpace(metadataDisplayName)) + { + metadata[IrcMessageTags.MSG_PARAM_DISPLAY_NAME] = metadataDisplayName; + } + + var profileImageUrl = TryResolveProfileImageUrl(metadataLogin, profileLookupUserId); + if (profileImageUrl is { Length: > 0 }) + { + metadata[IrcMessageTags.MSG_PARAM_PROFILE_IMAGE_URL] = profileImageUrl; + } + + if (ev.Raid.HasValue) + { + metadata[IrcMessageTags.MSG_PARAM_VIEWER_COUNT] = ev.Raid.Value.ViewerCount.ToString(); + } + + if (ev.Sub.HasValue) + { + metadata[IrcMessageTags.MSG_PARAM_SUB_PLAN] = ev.Sub.Value.SubTier; + } + + if (ev.Resub.HasValue) + { + metadata[IrcMessageTags.MSG_PARAM_SUB_PLAN] = ev.Resub.Value.SubTier; + metadata[IrcMessageTags.MSG_PARAM_CUMULATIVE_MONTHS] = ev.Resub.Value.CumulativeMonths.ToString(); + metadata[IrcMessageTags.MSG_PARAM_MONTHS] = ev.Resub.Value.DurationMonths.ToString(); + } + + if (ev.GiftSub.HasValue) + { + if (!string.IsNullOrWhiteSpace(ev.GiftSub.Value.RecipientUserName)) + { + metadata[IrcMessageTags.MSG_PARAM_RECIPIENT_USER_NAME] = ev.GiftSub.Value.RecipientUserName; + metadata[IrcMessageTags.MSG_PARAM_RECIPIENT_DISPLAY_NAME] = ev.GiftSub.Value.RecipientUserName; + } + + metadata[IrcMessageTags.MSG_PARAM_SUB_PLAN] = ev.GiftSub.Value.SubTier; + } + + return new ReadOnlyDictionary(metadata); + } + + private string? TryResolveProfileImageUrl(string? loginName, string? userId) + { + if (!string.IsNullOrWhiteSpace(userId) && _profileImageUrlCache.TryGetValue($"id:{userId}", out var cachedById)) + { + return cachedById; + } + + if (!string.IsNullOrWhiteSpace(loginName) && _profileImageUrlCache.TryGetValue($"login:{loginName}", out var cachedByLogin)) + { + return cachedByLogin; + } + + // Perform Helix lookup in a background task to avoid blocking the calling thread. + if (!string.IsNullOrWhiteSpace(userId) || !string.IsNullOrWhiteSpace(loginName)) + { + _ = Task.Run(async () => + { + try + { + ResponseBase? userInfo = null; + + if (!string.IsNullOrWhiteSpace(userId)) + { + userInfo = await _twitchHelixApiService.FetchUserInfo(userIds: new[] { userId! }).ConfigureAwait(false); + } + + if ((userInfo?.Data?.Count ?? 0) == 0 && !string.IsNullOrWhiteSpace(loginName)) + { + userInfo = await _twitchHelixApiService.FetchUserInfo(loginNames: new[] { loginName! }).ConfigureAwait(false); + } + + var user = userInfo?.Data?.FirstOrDefault(); + if (!user.HasValue || string.IsNullOrWhiteSpace(user.Value.ProfileImageUrl)) + { + return; + } + + var userValue = user.Value; + var profileImageUrl = userValue.ProfileImageUrl; + if (!string.IsNullOrWhiteSpace(userValue.UserId)) + { + _profileImageUrlCache[$"id:{userValue.UserId}"] = profileImageUrl; + } + + if (!string.IsNullOrWhiteSpace(userValue.LoginName)) + { + _profileImageUrlCache[$"login:{userValue.LoginName}"] = profileImageUrl; + } + } + catch (Exception ex) + { + _logger.Verbose(ex, "Failed to resolve profile image URL for login={LoginName}, userId={UserId}", loginName, userId); + } + }); + } + + // Cache miss: return null immediately; cache will be populated by the background task above. + return null; + } + + private static string EscapeLegacyIrcSystemMessage(string value) + { + if (string.IsNullOrEmpty(value)) + { + return string.Empty; + } + + return value.Replace(" ", "\\s"); + } + + private static string ResolveNotificationText(EventSubChatNotificationEvent ev) + { + if (!string.IsNullOrWhiteSpace(ev.Message.Text)) + { + return ev.Message.Text; + } + + if (!string.IsNullOrWhiteSpace(ev.SystemMessage)) + { + return ev.SystemMessage ?? string.Empty; + } + + return ev.NoticeType switch + { + "raid" => BuildRaidNotificationText(ev), + "sub" => $"{GetPreferredDisplayName(ev)} subscribed.", + "resub" => BuildResubNotificationText(ev), + "gift_sub" => BuildGiftSubNotificationText(ev), + "pay_it_forward" => $"{GetPreferredDisplayName(ev)} paid a gift forward.", + "bits_badge_tier" => BuildBitsBadgeTierNotificationText(ev), + "announcement" => GetPreferredDisplayName(ev), + "unraid" => $"{GetPreferredDisplayName(ev)} canceled the raid.", + _ => string.Empty + }; + } + + private static string BuildRaidNotificationText(EventSubChatNotificationEvent ev) + { + var raiderName = ev.Raid?.UserName; + if (string.IsNullOrWhiteSpace(raiderName)) + { + raiderName = GetPreferredDisplayName(ev); + } + + var viewerCount = ev.Raid?.ViewerCount ?? 0; + var viewerLabel = viewerCount == 1 ? "viewer" : "viewers"; + return viewerCount > 0 + ? $"{raiderName} is raiding with a party of {viewerCount} {viewerLabel}." + : $"{raiderName} is raiding."; + } + + private static string BuildResubNotificationText(EventSubChatNotificationEvent ev) + { + var userName = GetPreferredDisplayName(ev); + var cumulativeMonths = ev.Resub?.CumulativeMonths ?? 0; + return cumulativeMonths > 0 + ? $"{userName} subscribed for {cumulativeMonths} months." + : $"{userName} resubscribed."; + } + + private static string BuildGiftSubNotificationText(EventSubChatNotificationEvent ev) + { + var gifterName = GetPreferredDisplayName(ev); + var recipientName = ev.GiftSub?.RecipientUserName; + return !string.IsNullOrWhiteSpace(recipientName) + ? $"{gifterName} gifted a sub to {recipientName}." + : $"{gifterName} gifted a subscription."; + } + + private static string BuildBitsBadgeTierNotificationText(EventSubChatNotificationEvent ev) + { + var userName = GetPreferredDisplayName(ev); + var tier = ev.BitsBadgeTier?.Tier ?? 0; + return tier > 0 + ? $"{userName} reached Bits badge tier {tier}." + : $"{userName} reached a new Bits badge tier."; + } + + private static string GetPreferredDisplayName(EventSubChatNotificationEvent ev) + { + if (!string.IsNullOrWhiteSpace(ev.ChatterUserName)) + { + return ev.ChatterUserName; + } + + if (!string.IsNullOrWhiteSpace(ev.ChatterUserLogin)) + { + return ev.ChatterUserLogin; + } + + if (!string.IsNullOrWhiteSpace(ev.Raid?.UserName)) + { + return ev.Raid?.UserName ?? string.Empty; + } + + return string.Empty; + } + + private void HandleMessageDelete(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var payload = JsonSerializer.Deserialize( + jsonDocument.RootElement.GetProperty("payload"), + TwitchEventSubSerializerContext.Default.EventSubChatMessageDeletePayload); + + var ev = payload.Event; + var channel = new TwitchChannel(this, ev.BroadcasterUserId, ev.BroadcasterUserLogin); + + OnMessageDeleted?.Invoke(channel, ev.MessageId); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.chat.message_delete"); + } + } + + private void HandleChatClear(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var payload = JsonSerializer.Deserialize( + jsonDocument.RootElement.GetProperty("payload"), + TwitchEventSubSerializerContext.Default.EventSubChatClearPayload); + + var ev = payload.Event; + var channel = new TwitchChannel(this, ev.BroadcasterUserId, ev.BroadcasterUserLogin); + + OnChatCleared?.Invoke(channel, null); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.chat.clear"); + } + } + + private void HandleSettingsUpdate(string rawMessage) + { + try + { + using var jsonDocument = JsonDocument.Parse(rawMessage); + var payload = JsonSerializer.Deserialize( + jsonDocument.RootElement.GetProperty("payload"), + TwitchEventSubSerializerContext.Default.EventSubChatSettingsPayload); + + var ev = payload.Event; + + // Build IRC-equivalent tags dictionary + var tags = new Dictionary(); + // Populate tags for both enabled and disabled states so room state transitions correctly. + tags[IrcMessageTags.EMOTE_ONLY] = ev.EmoteMode ? "1" : "0"; + tags[IrcMessageTags.FOLLOWERS_ONLY] = ev.FollowerMode + ? ev.FollowerModeDurationMinutes.ToString() + : "-1"; + tags[IrcMessageTags.SUBS_ONLY] = ev.SubscriberMode ? "1" : "0"; + tags[IrcMessageTags.R9_K] = ev.UniqueChatMode ? "1" : "0"; + tags[IrcMessageTags.SLOW] = ev.SlowMode + ? ev.SlowModeWaitSeconds.ToString() + : "0"; + + var roomStateDict = new ReadOnlyDictionary(tags); + _roomStateTrackerService.UpdateRoomState(ev.BroadcasterUserLogin, roomStateDict); + + var channel = new TwitchChannel(this, ev.BroadcasterUserId, ev.BroadcasterUserLogin); + OnRoomStateChanged?.Invoke(channel); + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to handle channel.chat.settings.update"); + } + } + + private void HandleRevocation(string rawMessage) + { + _logger.Warning("Received revocation message: {Message}", rawMessage); + } + + private async Task SubscribeToAllChannelsInternal() + { + if (_sessionId == null) + { + return; + } + + var activeChannels = _twitchChannelManagementService.GetAllActiveChannelsAsDictionary(); + var channelIds = _twitchChannelManagementService.GetAllActiveChannelIds(); + foreach (var channelId in channelIds) + { + if (!activeChannels.TryGetValue(channelId, out var channelName)) + { + _logger.Warning("Active channel dictionary missing entry for channel ID {ChannelId} while subscribing to all channels.", channelId); + continue; + } + + await SubscribeToChannelInternal(channelId, channelName).ConfigureAwait(false); + } + } + + private async Task SubscribeToChannelInternal(string channelId, string channelName) + { + if (_sessionId == null) + { + _logger.Verbose("Cannot subscribe: session not established"); + return; + } + + if (_loggedInUser == null) + { + _logger.Warning("Cannot subscribe to channel {ChannelId}: logged in user is unavailable", channelId); + return; + } + + var loggedInUserId = _loggedInUser.Value.UserId; + var subscriptionRequests = new List<(string type, string version, Dictionary condition)> + { + (SUB_TYPE_CHANNEL_CHAT_MESSAGE, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId }, { "user_id", loggedInUserId } }), + (SUB_TYPE_CHANNEL_CHAT_NOTIFICATION, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId }, { "user_id", loggedInUserId } }), + (SUB_TYPE_CHANNEL_CHAT_MESSAGE_DELETE, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId }, { "user_id", loggedInUserId } }), + (SUB_TYPE_CHANNEL_CHAT_CLEAR, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId }, { "user_id", loggedInUserId } }), + (SUB_TYPE_CHANNEL_CHAT_SETTINGS_UPDATE, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId }, { "user_id", loggedInUserId } }), + (SUB_TYPE_STREAM_ONLINE, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_STREAM_OFFLINE, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_AD_BREAK_BEGIN, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_FOLLOW, EVENTSUB_VERSION_FOLLOW, new Dictionary { { "broadcaster_user_id", channelId }, { "moderator_user_id", loggedInUserId } }), + (SUB_TYPE_CHANNEL_POLL_BEGIN, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_POLL_PROGRESS, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_POLL_END, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_PREDICTION_BEGIN, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_PREDICTION_PROGRESS, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_PREDICTION_LOCK, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_PREDICTION_END, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }), + (SUB_TYPE_CHANNEL_REWARD_REDEEM, EVENTSUB_VERSION, new Dictionary { { "broadcaster_user_id", channelId } }) + }; + + var subscriptionIds = new List(); + var failedTypes = new List(); + foreach (var request in subscriptionRequests) + { + var subscriptionId = await _twitchHelixApiService.CreateEventSubSubscription(request.type, request.version, request.condition, _sessionId).ConfigureAwait(false); + if (subscriptionId != null) + { + subscriptionIds.Add(subscriptionId); + } + else + { + _logger.Warning("Failed to create EventSub subscription type {SubscriptionType} for channel {ChannelId}", request.type, channelId); + failedTypes.Add(request.type); + } + } + + if (subscriptionIds.Count > 0) + { + // Replace existing subscriptions with the successfully created set. + // Some EventSub types may fail due to missing optional scopes, but chat should remain functional. + if (_channelSubscriptionIds.TryRemove(channelId, out var existingSubscriptionIds)) + { + foreach (var existingId in existingSubscriptionIds) + { + await _twitchHelixApiService.DeleteEventSubSubscription(existingId).ConfigureAwait(false); + } + } + + _channelSubscriptionIds[channelId] = subscriptionIds; + if (failedTypes.Count > 0) + { + _logger.Warning("EventSub subscriptions partially created for channel {ChannelId}. Created={CreatedCount}, FailedTypes={FailedTypes}", + channelId, subscriptionIds.Count, string.Join(",", failedTypes)); + } + OnJoinChannel?.Invoke(new TwitchChannel(this, channelId, channelName)); + } + else if (subscriptionIds.Count == 0) + { + _logger.Warning("Failed to create EventSub subscriptions for channel {ChannelId}. AttemptedTypes={AttemptedTypes}", channelId, string.Join(",", subscriptionRequests.Select(x => x.type))); + } + } + + private static string ToLegacyServerTimeRaw(DateTimeOffset value) + { + var unixMicros = value.ToUnixTimeMilliseconds() * 1000L; + return $"{unixMicros / 1000000}.{unixMicros % 1000000:D6}"; + } + + private static string NormalizeHexColorOrDefault(string? rawColor, string fallback) + { + if (string.IsNullOrWhiteSpace(rawColor)) + { + return fallback; + } + + var color = rawColor!.Trim(); + if (color.Length == 7 && color[0] == '#') + { + return color; + } + + if (color.Length == 6) + { + return "#" + color; + } + + return fallback; + } + + private static bool TryGetString(JsonElement element, string propertyName, out string value) + { + if (element.TryGetProperty(propertyName, out var property) && property.ValueKind == JsonValueKind.String) + { + value = property.GetString()!; + return true; + } + + value = string.Empty; + return false; + } + + private static bool TryGetBool(JsonElement element, string propertyName) + { + return element.TryGetProperty(propertyName, out var property) && property.ValueKind == JsonValueKind.True; + } + + private static uint TryGetUInt(JsonElement element, string propertyName) + { + if (!element.TryGetProperty(propertyName, out var property)) + { + return 0; + } + + if (property.ValueKind == JsonValueKind.Number && property.TryGetUInt32(out var number)) + { + return number; + } + + return 0; + } + + private static uint? TryGetNullableUInt(JsonElement element, string propertyName) + { + if (!element.TryGetProperty(propertyName, out var property) || property.ValueKind == JsonValueKind.Null) + { + return null; + } + + if (property.ValueKind == JsonValueKind.Number && property.TryGetUInt32(out var number)) + { + return number; + } + + return null; + } + + private static DateTimeOffset? TryGetDateTime(JsonElement element, string propertyName) + { + if (!element.TryGetProperty(propertyName, out var property) || property.ValueKind != JsonValueKind.String) + { + return null; + } + + var raw = property.GetString(); + return DateTimeOffset.TryParse(raw, out var parsed) ? parsed : null; + } + + private static PollStatus ParsePollStatus(string? status) + { + return status?.ToLowerInvariant() switch + { + "active" => PollStatus.Active, + "completed" => PollStatus.Completed, + "terminated" => PollStatus.Terminated, + "archived" => PollStatus.Archived, + "moderated" => PollStatus.Moderated, + _ => PollStatus.Invalid + }; + } + + private static PredictionStatus ParsePredictionStatus(string? status) + { + return status?.ToLowerInvariant() switch + { + "active" => PredictionStatus.Active, + "resolved" => PredictionStatus.Resolved, + "canceled" => PredictionStatus.Cancelled, + "cancelled" => PredictionStatus.Cancelled, + "locked" => PredictionStatus.Locked, + _ => PredictionStatus.Active + }; + } + + private void TryStartViewCountPollingIfNeeded() + { + lock (_viewCountPollingStateLock) + { + if (_viewCountPollingCancellationTokenSource != null || _viewCountCallbackRegistrations.IsEmpty) + { + return; + } + + if (_loggedInUser == null || !_activeStateManager.GetState(PlatformType.Twitch)) + { + return; + } + + _viewCountPollingCancellationTokenSource = new CancellationTokenSource(); + _viewCountPollingTask = Task.Run(() => RunViewCountPollingLoop(_viewCountPollingCancellationTokenSource.Token)); + } + } + + private async Task StopViewCountPollingIfRunning() + { + CancellationTokenSource? cancellationTokenSource; + Task? pollingTask; + + lock (_viewCountPollingStateLock) + { + cancellationTokenSource = _viewCountPollingCancellationTokenSource; + pollingTask = _viewCountPollingTask; + _viewCountPollingCancellationTokenSource = null; + _viewCountPollingTask = null; + } + + if (cancellationTokenSource == null) + { + return; + } + + cancellationTokenSource.Cancel(); + try + { + if (pollingTask != null) + { + await pollingTask.ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + // Expected while stopping + } + finally + { + cancellationTokenSource.Dispose(); + } + } + + private async Task RunViewCountPollingLoop(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + await PollViewCountsOnce(cancellationToken).ConfigureAwait(false); + await Task.Delay(VIEW_COUNT_POLL_INTERVAL, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to poll Twitch viewer counts"); + await Task.Delay(VIEW_COUNT_POLL_INTERVAL, cancellationToken).ConfigureAwait(false); + } + } + } + + private async Task PollViewCountsOnce(CancellationToken cancellationToken) + { + if (_viewCountCallbackRegistrations.IsEmpty) + { + return; + } + + var channelIds = _channelSubscriptionIds.Keys.ToArray(); + if (channelIds.Length == 0) + { + return; + } + + var serverTimeRaw = ToLegacyServerTimeRaw(DateTimeOffset.UtcNow); + for (var offset = 0; offset < channelIds.Length; offset += HELIX_USER_IDS_PER_REQUEST_LIMIT) + { + cancellationToken.ThrowIfCancellationRequested(); + + var chunkSize = Math.Min(HELIX_USER_IDS_PER_REQUEST_LIMIT, channelIds.Length - offset); + var userIds = new string[chunkSize]; + Array.Copy(channelIds, offset, userIds, 0, chunkSize); + + var streamResponse = await _twitchHelixApiService.GetStreams(userIds: userIds, cancellationToken: cancellationToken).ConfigureAwait(false); + if (streamResponse == null) + { + continue; + } + + foreach (var stream in streamResponse.Value.Data) + { + var viewCountUpdate = new ViewCountUpdate(serverTimeRaw, stream.ViewerCount); + foreach (var callback in _viewCountCallbackRegistrations.Keys) + { + callback(stream.UserId, viewCountUpdate); + } + } + } + } + + private async Task UnsubscribeFromChannelInternal(string channelId) + { + if (!_channelSubscriptionIds.TryRemove(channelId, out var subscriptionIds)) + { + return; + } + + foreach (var subscriptionId in subscriptionIds) + { + await _twitchHelixApiService.DeleteEventSubSubscription(subscriptionId).ConfigureAwait(false); + } + + // Get the channel name for the event + var channelDictionary = _twitchChannelManagementService.GetAllActiveChannelsAsDictionary(includeSelfRegardlessOfState: true); + if (channelDictionary.TryGetValue(channelId, out var channelName)) + { + _roomStateTrackerService.UpdateRoomState(channelName, null); + _userStateTrackerService.UpdateUserState(channelId, null); + + OnLeaveChannel?.Invoke(new TwitchChannel(this, channelId, channelName)); + } + } + } +} diff --git a/CatCore/Services/Twitch/TwitchHelixApiService.EventSub.cs b/CatCore/Services/Twitch/TwitchHelixApiService.EventSub.cs new file mode 100644 index 00000000..0ded9307 --- /dev/null +++ b/CatCore/Services/Twitch/TwitchHelixApiService.EventSub.cs @@ -0,0 +1,211 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Http; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using CatCore.Exceptions; +using CatCore.Helpers.JSON; +using CatCore.Models.Twitch.Helix.Requests; +using CatCore.Models.Twitch.Helix.Responses; +using Serilog; + +namespace CatCore.Services.Twitch +{ + public sealed partial class TwitchHelixApiService + { + /// + /// Sends a chat message to a broadcaster's chat. + /// + /// The ID of the broadcaster whose chat you want to send a message to + /// The ID of the user sending the message + /// The message text. Max length is 500 characters + /// CancellationToken that can be used to cancel the call + /// True if the message was sent successfully, false otherwise + /// Gets thrown when the user isn't authenticated + internal async Task SendChatMessage(string broadcasterId, string senderUserId, string message, CancellationToken cancellationToken = default) + { + await CheckUserLoggedIn().ConfigureAwait(false); + + if (message.Length > 500) + { + _logger.Warning("Message length exceeds 500 characters. Trimming to 500"); + message = message.Substring(0, 500); + } + + var requestDto = new SendChatMessageRequestDto + { + BroadcasterId = broadcasterId, + SenderId = senderUserId, + Message = message + }; + + var url = $"chat/messages"; + try + { + var jsonContent = JsonSerializer.Serialize(requestDto, TwitchHelixSerializerContext.Default.SendChatMessageRequestDto); + using var response = await _combinedHelixPolicy.ExecuteAsync(async ct => + { + using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, url) + { + Content = new StringContent(jsonContent, System.Text.Encoding.UTF8, "application/json") + }; + return await _helixClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); + + return response?.IsSuccessStatusCode ?? false; + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to send chat message"); + return false; + } + } + + /// + /// Creates an EventSub subscription for receiving real-time events. + /// + /// The subscription type (e.g., "channel.chat.message") + /// The subscription version (typically "1") + /// Dictionary of condition parameters (e.g., {"broadcaster_user_id": "123"}) + /// The WebSocket session ID to receive events on + /// CancellationToken that can be used to cancel the call + /// The subscription ID if successful, null otherwise + internal async Task CreateEventSubSubscription(string type, string version, Dictionary condition, string sessionId, CancellationToken cancellationToken = default) + { + if (!_twitchAuthService.HasTokens) + { + _logger.Warning("Token not valid. Either the user is not logged in or the token has been revoked"); + return null; + } + + if (!_twitchAuthService.TokenIsValid && !await _twitchAuthService.RefreshTokens().ConfigureAwait(false)) + { + return null; + } + + var requestDto = new EventSubSubscriptionRequestDto + { + Type = type, + Version = version, + Condition = condition, + Transport = new EventSubTransportDto + { + Method = "websocket", + SessionId = sessionId + } + }; + + var url = "eventsub/subscriptions"; + +#if DEBUG + _logger.Verbose("Creating EventSub subscription: Type={Type}, Version={Version}, Condition={Condition}", type, version, string.Join(", ", condition.Select(kvp => $"{kvp.Key}={kvp.Value}"))); +#endif + + try + { + using var httpResponseMessage = await _combinedHelixPolicy.ExecuteAsync(async ct => + { + var jsonContent = JsonSerializer.Serialize(requestDto, TwitchHelixSerializerContext.Default.EventSubSubscriptionRequestDto); + var content = new StringContent(jsonContent, System.Text.Encoding.UTF8, "application/json"); + using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, url) { Content = content }; + return await _helixClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); + + if (httpResponseMessage == null) + { + return null; + } + + if (!httpResponseMessage.IsSuccessStatusCode) + { + var errorContent = await httpResponseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); + _logger.Warning("Failed to create EventSub subscription. Type={Type}, StatusCode={StatusCode}, Response={Response}", type, httpResponseMessage.StatusCode, errorContent); + return null; + } + + var responseJson = await httpResponseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); + var response = JsonSerializer.Deserialize(responseJson, TwitchHelixSerializerContext.Default.EventSubSubscriptionResponseDto); + + if (EqualityComparer.Default.Equals(response, default)) + { + _logger.Warning("Failed to deserialize EventSub subscription response. Type={Type}, Payload={Payload}", type, responseJson); + return null; + } + + if (response.Data != null && response.Data.Count > 0) + { + var subscriptionId = response.Data[0].Id; + _logger.Information("EventSub subscription created successfully. ID: {SubscriptionId}", subscriptionId); + return subscriptionId; + } + + _logger.Warning("EventSub subscription response did not include data. Type={Type}, Payload={Payload}", type, responseJson); + + return null; + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to create EventSub subscription"); + return null; + } + } + + /// + /// Deletes an EventSub subscription. + /// + /// The subscription ID to delete + /// CancellationToken that can be used to cancel the call + /// True if deletion was successful, false otherwise + internal async Task DeleteEventSubSubscription(string subscriptionId, CancellationToken cancellationToken = default) + { + if (!_twitchAuthService.HasTokens) + { + _logger.Warning("Token not valid. Either the user is not logged in or the token has been revoked"); + return false; + } + + if (!_twitchAuthService.TokenIsValid && !await _twitchAuthService.RefreshTokens().ConfigureAwait(false)) + { + return false; + } + + var url = $"eventsub/subscriptions?id={Uri.EscapeDataString(subscriptionId)}"; + +#if DEBUG + _logger.Verbose("Deleting EventSub subscription: {SubscriptionId}", subscriptionId); +#endif + + try + { + using var httpResponseMessage = await _combinedHelixPolicy.ExecuteAsync(async ct => + { + using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Delete, url); + return await _helixClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); + + if (httpResponseMessage == null) + { + _logger.Warning("Failed to delete EventSub subscription {SubscriptionId}: received null HTTP response", subscriptionId); + return false; + } + + if (!httpResponseMessage.IsSuccessStatusCode) + { + var errorContent = await httpResponseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); + _logger.Warning("Failed to delete EventSub subscription {SubscriptionId}. StatusCode={StatusCode}, Response={Response}", subscriptionId, httpResponseMessage.StatusCode, errorContent); + return false; + } + + _logger.Information("EventSub subscription deleted successfully. ID: {SubscriptionId}", subscriptionId); + return true; + } + catch (Exception ex) + { + _logger.Warning(ex, "Failed to delete EventSub subscription {SubscriptionId}", subscriptionId); + return false; + } + } + } +} diff --git a/CatCore/Services/Twitch/TwitchHelixApiService.cs b/CatCore/Services/Twitch/TwitchHelixApiService.cs index 75e09ddf..9203ccde 100644 --- a/CatCore/Services/Twitch/TwitchHelixApiService.cs +++ b/CatCore/Services/Twitch/TwitchHelixApiService.cs @@ -70,7 +70,7 @@ internal TwitchHelixApiService(ILogger logger, ITwitchAuthService twitchAuthServ var exceptionRetryPolicy = Policy .Handle() .OrResult(resp => resp.StatusCode == HttpStatusCode.Conflict) - .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromMilliseconds(2 ^ (retryAttempt - 1) * 500)); + .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromMilliseconds((1 << (retryAttempt - 1)) * 500)); var bulkheadPolicy = Policy.BulkheadAsync(4, 1000); diff --git a/CatCore/Services/Twitch/TwitchIrcService.cs b/CatCore/Services/Twitch/TwitchIrcService.cs index bc2d34e7..a459bf72 100644 --- a/CatCore/Services/Twitch/TwitchIrcService.cs +++ b/CatCore/Services/Twitch/TwitchIrcService.cs @@ -19,6 +19,7 @@ namespace CatCore.Services.Twitch { + [Obsolete("Legacy IRC transport is deprecated. Use the EventSub chat service (TwitchEventSubChatService) via ITwitchIrcService registration.")] internal sealed class TwitchIrcService : ITwitchIrcService { private const string TWITCH_IRC_ENDPOINT = "wss://irc-ws.chat.twitch.tv:443"; diff --git a/CatCore/Services/Twitch/TwitchService.cs b/CatCore/Services/Twitch/TwitchService.cs index 36137422..b3be40a6 100644 --- a/CatCore/Services/Twitch/TwitchService.cs +++ b/CatCore/Services/Twitch/TwitchService.cs @@ -35,6 +35,7 @@ internal TwitchService(ILogger logger, ITwitchAuthService twitchAuthService, ITw } /// + [Obsolete("Twitch Legacy PubSub was decommissioned. Migrate to EventSub-based APIs.")] public ITwitchPubSubServiceManager GetPubSubService() => _twitchPubSubServiceManager; ///