Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add premium user check in celery task #2668

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 106 additions & 28 deletions backend/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from datetime import datetime, timezone
from datetime import datetime
from tempfile import NamedTemporaryFile
from uuid import UUID

Expand All @@ -8,9 +8,8 @@
from logger import get_logger
from middlewares.auth.auth_bearer import AuthBearer
from models.files import File
from models.settings import get_supabase_client
from models.settings import get_supabase_client, get_supabase_db
from modules.brain.integrations.Notion.Notion_connector import NotionConnector
from modules.brain.repository.integration_brains import IntegrationBrain
from modules.brain.service.brain_service import BrainService
from modules.brain.service.brain_vector_service import BrainVectorService
from modules.notification.dto.inputs import NotificationUpdatableProperties
Expand Down Expand Up @@ -180,38 +179,113 @@ def ping_telemetry():
maybe_send_telemetry("ping", {"ping": "pong"})


@celery.task
def process_integration_brain_sync():
integration = IntegrationBrain()
integrations = integration.get_integration_brain_by_type_integration("notion")

time = datetime.now(timezone.utc) # Make `time` timezone-aware
# last_synced is a string that represents a timestampz in the database
# only call process_integration_brain_sync_user_brain if more than 1 day has passed since the last sync
if not integrations:
return
# TODO fix this
# for integration in integrations:
# print(f"last_synced: {integration.last_synced}")
# print(f"Integration Name: {integration.name}")
# last_synced = datetime.strptime(
# integration.last_synced, "%Y-%m-%dT%H:%M:%S.%f%z"
# )
# if last_synced < time - timedelta(hours=12) and integration.name == "notion":
# process_integration_brain_sync_user_brain.delay(
# brain_id=integration.brain_id, user_id=integration.user_id
# )
@celery.task(name="check_if_is_premium_user")
def check_if_is_premium_user():
supabase = get_supabase_db()
supabase_db = supabase.db
# Get the list of subscription active
subscriptions = (
supabase_db.table("subscriptions")
.select("*")
.filter(
"current_period_end",
"gt",
datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
)
.execute()
).data
logger.debug(f"Subscriptions: {subscriptions}")

# Get List of all customers
customers = (
supabase_db.table("customers")
.select("*")
.order("created", desc=True)
.execute()
.data
)
unique_customers = {}
for customer in customers:
if customer["email"] not in unique_customers:
unique_customers[customer["email"]] = customer
customers = list(unique_customers.values())
logger.debug(f"Unique Customers with latest created date: {customers}")

# Matching Products
matching_product_settings = (
supabase_db.table("product_to_features").select("*").execute()
).data
logger.debug(f"Matching product settings: {matching_product_settings}")

# if customer.id in subscriptions.customer then find the user id in the table users where email = customer.email and then update the user_settings with is_premium = True else delete the user_settings

for customer in customers:
logger.debug(f"Customer: {customer}")
# Find the subscription of the customer
user_id = None
matching_subscription = [
subscription
for subscription in subscriptions
if subscription["customer"] == customer["id"]
]
logger.debug(f"Matching subscription: {matching_subscription}")
user_id = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be a join on the customer<->user on the email so we can get all at the same time. Not critical for now! maybe just a TODO 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not possible with supabase without a foreign key. Being foreign table I can't do that

supabase_db.table("users")
.select("id")
.filter("email", "eq", customer["email"])
.execute()
).data
if len(user_id) > 0:
user_id = user_id[0]["id"]
else:
logger.debug(f"User not found for customer: {customer}")
continue
if len(matching_subscription) > 0:

# Get the matching product from the subscription
matching_product_settings = [
product
for product in matching_product_settings
if product["stripe_product_id"]
== matching_subscription[0]["attrs"]["items"]["data"][0]["plan"][
"product"
]
]
# Update the user with the product settings
supabase_db.table("user_settings").update(
{
"max_brains": matching_product_settings[0]["max_brains"],
"max_brain_size": matching_product_settings[0]["max_brain_size"],
"monthly_chat_credit": matching_product_settings[0][
"monthly_chat_credit"
],
"api_access": matching_product_settings[0]["api_access"],
"models": matching_product_settings[0]["models"],
"is_premium": True,
}
).match({"user_id": str(user_id)}).execute()
else:
# check if user_settings is_premium is true then delete the user_settings
user_settings = (
supabase_db.table("user_settings")
.select("*")
.filter("user_id", "eq", user_id)
.filter("is_premium", "eq", True)
.execute()
).data
if len(user_settings) > 0:
supabase_db.table("user_settings").delete().match(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small question here if len(match_subscription) == 0 do we delete the row or update the is_premium to False ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm deleting the row because then when the user ask a new question it will insert with the models, questions and all by default. Otherwise I have to look for the default value of the db manually

{"user_id": user_id}
).execute()

return True


celery.conf.beat_schedule = {
"remove_onboarding_more_than_x_days_task": {
"task": f"{__name__}.remove_onboarding_more_than_x_days_task",
"schedule": crontab(minute="0", hour="0"),
},
"process_integration_brain_sync": {
"task": f"{__name__}.process_integration_brain_sync",
"schedule": crontab(minute="*/5", hour="*"),
},
"ping_telemetry": {
"task": f"{__name__}.ping_telemetry",
"schedule": crontab(minute="*/30", hour="*"),
Expand All @@ -220,4 +294,8 @@ def process_integration_brain_sync():
"task": "process_sync_active",
"schedule": crontab(minute="*/1", hour="*"),
},
"process_premium_users": {
"task": "check_if_is_premium_user",
"schedule": crontab(minute="*/1", hour="*"),
},
}
150 changes: 1 addition & 149 deletions backend/models/databases/supabase/user_usage.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
logger = get_logger(__name__)


#TODO: change the name of this class because another one already exists
# TODO: change the name of this class because another one already exists
class UserUsage(Repository):
def __init__(self, supabase_client):
self.db = supabase_client
Expand All @@ -28,142 +28,6 @@ def create_user_daily_usage(
.execute()
)

def check_subscription_validity(self, customer_id: str) -> bool:
"""
Check if the subscription of the user is still valid
"""
now = datetime.now()

# Format the datetime object as a string in the appropriate format for your Supabase database
now_str = now.strftime("%Y-%m-%d %H:%M:%S.%f")
subscription_still_valid = (
self.db.from_("subscriptions")
.select("*")
.filter(
"customer", "eq", customer_id
) # then check if current_period_end is greater than now with timestamp format
.filter("current_period_end", "gt", now_str)
.execute()
).data

if len(subscription_still_valid) > 0:
return True

def check_user_is_customer(self, user_id: UUID) -> (bool, str):
"""
Check if the user is a customer and return the customer id
"""
user_email_customer = (
self.db.from_("users")
.select("*")
.filter("id", "eq", str(user_id))
.execute()
).data

if len(user_email_customer) == 0:
return False, None

matching_customers = (
self.db.table("customers")
.select("email,id")
.filter("email", "eq", user_email_customer[0]["email"])
.execute()
).data

if len(matching_customers) == 0:
return False, None

return True, matching_customers[0]["id"]

def update_customer_settings_with_product_settings(
self, user_id: UUID, customer_id: str
):
"""
Check if the user is a customer and return the customer id
"""

matching_products = (
self.db.table("subscriptions")
.select("attrs")
.filter("customer", "eq", customer_id)
.execute()
).data

# Output object
# {"id":"sub_1OUZOgJglvQxkJ1H98TSY9bv","plan":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","active":true,"amount":1900,"object":"plan","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","interval":"month","livemode":false,"metadata":{},"nickname":null,"tiers_mode":null,"usage_type":"licensed","amount_decimal":"1900","billing_scheme":"per_unit","interval_count":1,"aggregate_usage":null,"transform_usage":null,"trial_period_days":null},"items":{"url":"/v1/subscription_items?subscription=sub_1OUZOgJglvQxkJ1H98TSY9bv","data":[{"id":"si_PJBm1ciQlpaOA4","plan":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","active":true,"amount":1900,"object":"plan","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","interval":"month","livemode":false,"metadata":{},"nickname":null,"tiers_mode":null,"usage_type":"licensed","amount_decimal":"1900","billing_scheme":"per_unit","interval_count":1,"aggregate_usage":null,"transform_usage":null,"trial_period_days":null},"price":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","type":"recurring","active":true,"object":"price","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","livemode":false,"metadata":{},"nickname":null,"recurring":{"interval":"month","usage_type":"licensed","interval_count":1,"aggregate_usage":null,"trial_period_days":null},"lookup_key":null,"tiers_mode":null,"unit_amount":1900,"tax_behavior":"unspecified","billing_scheme":"per_unit","custom_unit_amount":null,"transform_quantity":null,"unit_amount_decimal":"1900"},"object":"subscription_item","created":1704307355,"metadata":{},"quantity":1,"tax_rates":[],"subscription":"sub_1OUZOgJglvQxkJ1H98TSY9bv","billing_thresholds":null}],"object":"list","has_more":false,"total_count":1},"object":"subscription","status":"active","created":1704307354,"currency":"usd","customer":"cus_PJBmxGOKfQgYDN","discount":null,"ended_at":null,"livemode":false,"metadata":{},"quantity":1,"schedule":null,"cancel_at":null,"trial_end":null,"start_date":1704307354,"test_clock":null,"application":null,"canceled_at":null,"description":null,"trial_start":null,"on_behalf_of":null,"automatic_tax":{"enabled":true},"transfer_data":null,"days_until_due":null,"default_source":null,"latest_invoice":"in_1OUZOgJglvQxkJ1HysujPh0b","pending_update":null,"trial_settings":{"end_behavior":{"missing_payment_method":"create_invoice"}},"pause_collection":null,"payment_settings":{"payment_method_types":null,"payment_method_options":null,"save_default_payment_method":"off"},"collection_method":"charge_automatically","default_tax_rates":[],"billing_thresholds":null,"current_period_end":1706985754,"billing_cycle_anchor":1704307354,"cancel_at_period_end":false,"cancellation_details":{"reason":null,"comment":null,"feedback":null},"current_period_start":1704307354,"pending_setup_intent":null,"default_payment_method":"pm_1OUZOfJglvQxkJ1HSHU0TTWW","application_fee_percent":null,"pending_invoice_item_interval":null,"next_pending_invoice_item_invoice":null}

# Now extract the product id from the object

if len(matching_products) == 0:
logger.info("No matching products found")
return

product_id = matching_products[0]["attrs"]["items"]["data"][0]["plan"][
"product"
]

# Now fetch the product settings

matching_product_settings = (
self.db.table("product_to_features")
.select("*")
.filter("stripe_product_id", "eq", product_id)
.execute()
).data

if len(matching_product_settings) == 0:
logger.info("No matching product settings found")
return

product_settings = matching_product_settings[0]

# Now update the user settings with the product settings
try:
self.db.table("user_settings").update(
{
"max_brains": product_settings["max_brains"],
"max_brain_size": product_settings["max_brain_size"],
"monthly_chat_credit": product_settings["monthly_chat_credit"],
"api_access": product_settings["api_access"],
"models": product_settings["models"],
}
).match({"user_id": str(user_id)}).execute()

except Exception as e:
logger.error(e)
logger.error("Error while updating user settings with product settings")

def check_if_is_premium_user(self, user_id: UUID):
"""
Check if the user is a premium user
"""
matching_customers = None
try:
user_is_customer, user_customer_id = self.check_user_is_customer(user_id)

if user_is_customer:
self.db.table("user_settings").update({"is_premium": True}).match(
{"user_id": str(user_id)}
).execute()

if user_is_customer and self.check_subscription_validity(user_customer_id):
logger.info("User is a premium user")
self.update_customer_settings_with_product_settings(
user_id, user_customer_id
)
return True, False
else:
self.db.table("user_settings").update({"is_premium": False}).match(
{"user_id": str(user_id)}
).execute()
return False, False

except Exception as e:
logger.info(
"Stripe needs to be configured if you want to have the premium features"
)
return False, True

def get_user_settings(self, user_id):
"""
Fetch the user settings from the database
Expand All @@ -189,18 +53,6 @@ def get_user_settings(self, user_id):

user_settings = user_settings_response[0]

check_is_premium, error = self.check_if_is_premium_user(user_id)

if check_is_premium and not error:
# get the possibly updated user settings
user_settings_response = (
self.db.from_("user_settings")
.select("*")
.filter("user_id", "eq", str(user_id))
.execute()
).data
return user_settings_response[0]

return user_settings

def get_model_settings(self):
Expand Down