Skip to content

Commit

Permalink
feat: Add premium user check in celery task (#2668)
Browse files Browse the repository at this point in the history
"This pull request adds a new celery task called
`check_if_is_premium_user` that checks if a user is a premium user based
on their subscription status. The task retrieves the list of active
subscriptions and the list of customers from the Supabase database. It
then matches the subscriptions with the customers and updates the user
settings with the corresponding premium features if a match is found. If
a user is not found or their subscription is expired, the user settings
are deleted. This task will run periodically to keep the user settings
up to date with the subscription status.

---------

Co-authored-by: Stan Girard <stan@quivr.app>
  • Loading branch information
StanGirard and StanGirard committed Jun 13, 2024
1 parent 6bd1a5b commit ec58935
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 177 deletions.
134 changes: 106 additions & 28 deletions backend/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from datetime import datetime, timezone
from datetime import datetime
from tempfile import NamedTemporaryFile
from uuid import UUID

Expand All @@ -8,9 +8,8 @@
from logger import get_logger
from middlewares.auth.auth_bearer import AuthBearer
from models.files import File
from models.settings import get_supabase_client
from models.settings import get_supabase_client, get_supabase_db
from modules.brain.integrations.Notion.Notion_connector import NotionConnector
from modules.brain.repository.integration_brains import IntegrationBrain
from modules.brain.service.brain_service import BrainService
from modules.brain.service.brain_vector_service import BrainVectorService
from modules.notification.dto.inputs import NotificationUpdatableProperties
Expand Down Expand Up @@ -180,38 +179,113 @@ def ping_telemetry():
maybe_send_telemetry("ping", {"ping": "pong"})


@celery.task
def process_integration_brain_sync():
integration = IntegrationBrain()
integrations = integration.get_integration_brain_by_type_integration("notion")

time = datetime.now(timezone.utc) # Make `time` timezone-aware
# last_synced is a string that represents a timestampz in the database
# only call process_integration_brain_sync_user_brain if more than 1 day has passed since the last sync
if not integrations:
return
# TODO fix this
# for integration in integrations:
# print(f"last_synced: {integration.last_synced}")
# print(f"Integration Name: {integration.name}")
# last_synced = datetime.strptime(
# integration.last_synced, "%Y-%m-%dT%H:%M:%S.%f%z"
# )
# if last_synced < time - timedelta(hours=12) and integration.name == "notion":
# process_integration_brain_sync_user_brain.delay(
# brain_id=integration.brain_id, user_id=integration.user_id
# )
@celery.task(name="check_if_is_premium_user")
def check_if_is_premium_user():
supabase = get_supabase_db()
supabase_db = supabase.db
# Get the list of subscription active
subscriptions = (
supabase_db.table("subscriptions")
.select("*")
.filter(
"current_period_end",
"gt",
datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
)
.execute()
).data
logger.debug(f"Subscriptions: {subscriptions}")

# Get List of all customers
customers = (
supabase_db.table("customers")
.select("*")
.order("created", desc=True)
.execute()
.data
)
unique_customers = {}
for customer in customers:
if customer["email"] not in unique_customers:
unique_customers[customer["email"]] = customer
customers = list(unique_customers.values())
logger.debug(f"Unique Customers with latest created date: {customers}")

# Matching Products
matching_product_settings = (
supabase_db.table("product_to_features").select("*").execute()
).data
logger.debug(f"Matching product settings: {matching_product_settings}")

# if customer.id in subscriptions.customer then find the user id in the table users where email = customer.email and then update the user_settings with is_premium = True else delete the user_settings

for customer in customers:
logger.debug(f"Customer: {customer}")
# Find the subscription of the customer
user_id = None
matching_subscription = [
subscription
for subscription in subscriptions
if subscription["customer"] == customer["id"]
]
logger.debug(f"Matching subscription: {matching_subscription}")
user_id = (
supabase_db.table("users")
.select("id")
.filter("email", "eq", customer["email"])
.execute()
).data
if len(user_id) > 0:
user_id = user_id[0]["id"]
else:
logger.debug(f"User not found for customer: {customer}")
continue
if len(matching_subscription) > 0:

# Get the matching product from the subscription
matching_product_settings = [
product
for product in matching_product_settings
if product["stripe_product_id"]
== matching_subscription[0]["attrs"]["items"]["data"][0]["plan"][
"product"
]
]
# Update the user with the product settings
supabase_db.table("user_settings").update(
{
"max_brains": matching_product_settings[0]["max_brains"],
"max_brain_size": matching_product_settings[0]["max_brain_size"],
"monthly_chat_credit": matching_product_settings[0][
"monthly_chat_credit"
],
"api_access": matching_product_settings[0]["api_access"],
"models": matching_product_settings[0]["models"],
"is_premium": True,
}
).match({"user_id": str(user_id)}).execute()
else:
# check if user_settings is_premium is true then delete the user_settings
user_settings = (
supabase_db.table("user_settings")
.select("*")
.filter("user_id", "eq", user_id)
.filter("is_premium", "eq", True)
.execute()
).data
if len(user_settings) > 0:
supabase_db.table("user_settings").delete().match(
{"user_id": user_id}
).execute()

return True


celery.conf.beat_schedule = {
"remove_onboarding_more_than_x_days_task": {
"task": f"{__name__}.remove_onboarding_more_than_x_days_task",
"schedule": crontab(minute="0", hour="0"),
},
"process_integration_brain_sync": {
"task": f"{__name__}.process_integration_brain_sync",
"schedule": crontab(minute="*/5", hour="*"),
},
"ping_telemetry": {
"task": f"{__name__}.ping_telemetry",
"schedule": crontab(minute="*/30", hour="*"),
Expand All @@ -220,4 +294,8 @@ def process_integration_brain_sync():
"task": "process_sync_active",
"schedule": crontab(minute="*/1", hour="*"),
},
"process_premium_users": {
"task": "check_if_is_premium_user",
"schedule": crontab(minute="*/1", hour="*"),
},
}
150 changes: 1 addition & 149 deletions backend/models/databases/supabase/user_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
logger = get_logger(__name__)


#TODO: change the name of this class because another one already exists
# TODO: change the name of this class because another one already exists
class UserUsage(Repository):
def __init__(self, supabase_client):
self.db = supabase_client
Expand All @@ -28,142 +28,6 @@ def create_user_daily_usage(
.execute()
)

def check_subscription_validity(self, customer_id: str) -> bool:
"""
Check if the subscription of the user is still valid
"""
now = datetime.now()

# Format the datetime object as a string in the appropriate format for your Supabase database
now_str = now.strftime("%Y-%m-%d %H:%M:%S.%f")
subscription_still_valid = (
self.db.from_("subscriptions")
.select("*")
.filter(
"customer", "eq", customer_id
) # then check if current_period_end is greater than now with timestamp format
.filter("current_period_end", "gt", now_str)
.execute()
).data

if len(subscription_still_valid) > 0:
return True

def check_user_is_customer(self, user_id: UUID) -> (bool, str):
"""
Check if the user is a customer and return the customer id
"""
user_email_customer = (
self.db.from_("users")
.select("*")
.filter("id", "eq", str(user_id))
.execute()
).data

if len(user_email_customer) == 0:
return False, None

matching_customers = (
self.db.table("customers")
.select("email,id")
.filter("email", "eq", user_email_customer[0]["email"])
.execute()
).data

if len(matching_customers) == 0:
return False, None

return True, matching_customers[0]["id"]

def update_customer_settings_with_product_settings(
self, user_id: UUID, customer_id: str
):
"""
Check if the user is a customer and return the customer id
"""

matching_products = (
self.db.table("subscriptions")
.select("attrs")
.filter("customer", "eq", customer_id)
.execute()
).data

# Output object
# {"id":"sub_1OUZOgJglvQxkJ1H98TSY9bv","plan":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","active":true,"amount":1900,"object":"plan","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","interval":"month","livemode":false,"metadata":{},"nickname":null,"tiers_mode":null,"usage_type":"licensed","amount_decimal":"1900","billing_scheme":"per_unit","interval_count":1,"aggregate_usage":null,"transform_usage":null,"trial_period_days":null},"items":{"url":"/v1/subscription_items?subscription=sub_1OUZOgJglvQxkJ1H98TSY9bv","data":[{"id":"si_PJBm1ciQlpaOA4","plan":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","active":true,"amount":1900,"object":"plan","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","interval":"month","livemode":false,"metadata":{},"nickname":null,"tiers_mode":null,"usage_type":"licensed","amount_decimal":"1900","billing_scheme":"per_unit","interval_count":1,"aggregate_usage":null,"transform_usage":null,"trial_period_days":null},"price":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","type":"recurring","active":true,"object":"price","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","livemode":false,"metadata":{},"nickname":null,"recurring":{"interval":"month","usage_type":"licensed","interval_count":1,"aggregate_usage":null,"trial_period_days":null},"lookup_key":null,"tiers_mode":null,"unit_amount":1900,"tax_behavior":"unspecified","billing_scheme":"per_unit","custom_unit_amount":null,"transform_quantity":null,"unit_amount_decimal":"1900"},"object":"subscription_item","created":1704307355,"metadata":{},"quantity":1,"tax_rates":[],"subscription":"sub_1OUZOgJglvQxkJ1H98TSY9bv","billing_thresholds":null}],"object":"list","has_more":false,"total_count":1},"object":"subscription","status":"active","created":1704307354,"currency":"usd","customer":"cus_PJBmxGOKfQgYDN","discount":null,"ended_at":null,"livemode":false,"metadata":{},"quantity":1,"schedule":null,"cancel_at":null,"trial_end":null,"start_date":1704307354,"test_clock":null,"application":null,"canceled_at":null,"description":null,"trial_start":null,"on_behalf_of":null,"automatic_tax":{"enabled":true},"transfer_data":null,"days_until_due":null,"default_source":null,"latest_invoice":"in_1OUZOgJglvQxkJ1HysujPh0b","pending_update":null,"trial_settings":{"end_behavior":{"missing_payment_method":"create_invoice"}},"pause_collection":null,"payment_settings":{"payment_method_types":null,"payment_method_options":null,"save_default_payment_method":"off"},"collection_method":"charge_automatically","default_tax_rates":[],"billing_thresholds":null,"current_period_end":1706985754,"billing_cycle_anchor":1704307354,"cancel_at_period_end":false,"cancellation_details":{"reason":null,"comment":null,"feedback":null},"current_period_start":1704307354,"pending_setup_intent":null,"default_payment_method":"pm_1OUZOfJglvQxkJ1HSHU0TTWW","application_fee_percent":null,"pending_invoice_item_interval":null,"next_pending_invoice_item_invoice":null}

# Now extract the product id from the object

if len(matching_products) == 0:
logger.info("No matching products found")
return

product_id = matching_products[0]["attrs"]["items"]["data"][0]["plan"][
"product"
]

# Now fetch the product settings

matching_product_settings = (
self.db.table("product_to_features")
.select("*")
.filter("stripe_product_id", "eq", product_id)
.execute()
).data

if len(matching_product_settings) == 0:
logger.info("No matching product settings found")
return

product_settings = matching_product_settings[0]

# Now update the user settings with the product settings
try:
self.db.table("user_settings").update(
{
"max_brains": product_settings["max_brains"],
"max_brain_size": product_settings["max_brain_size"],
"monthly_chat_credit": product_settings["monthly_chat_credit"],
"api_access": product_settings["api_access"],
"models": product_settings["models"],
}
).match({"user_id": str(user_id)}).execute()

except Exception as e:
logger.error(e)
logger.error("Error while updating user settings with product settings")

def check_if_is_premium_user(self, user_id: UUID):
"""
Check if the user is a premium user
"""
matching_customers = None
try:
user_is_customer, user_customer_id = self.check_user_is_customer(user_id)

if user_is_customer:
self.db.table("user_settings").update({"is_premium": True}).match(
{"user_id": str(user_id)}
).execute()

if user_is_customer and self.check_subscription_validity(user_customer_id):
logger.info("User is a premium user")
self.update_customer_settings_with_product_settings(
user_id, user_customer_id
)
return True, False
else:
self.db.table("user_settings").update({"is_premium": False}).match(
{"user_id": str(user_id)}
).execute()
return False, False

except Exception as e:
logger.info(
"Stripe needs to be configured if you want to have the premium features"
)
return False, True

def get_user_settings(self, user_id):
"""
Fetch the user settings from the database
Expand All @@ -189,18 +53,6 @@ def get_user_settings(self, user_id):

user_settings = user_settings_response[0]

check_is_premium, error = self.check_if_is_premium_user(user_id)

if check_is_premium and not error:
# get the possibly updated user settings
user_settings_response = (
self.db.from_("user_settings")
.select("*")
.filter("user_id", "eq", str(user_id))
.execute()
).data
return user_settings_response[0]

return user_settings

def get_model_settings(self):
Expand Down

0 comments on commit ec58935

Please sign in to comment.