In [1]:
import faker
from pydantic import BaseModel, EmailStr
from datetime import date
from typing import List
import pandas as pd
import numpy as np
import random
from datetime import datetime

fake = faker.Faker(locale='en_US')

    •	Users API → user_id, signup_date, plan_id, region, referral_source
    •	Plans API → plan_id, plan_name, monthly_fee, yearly_fee, features
    •	Subscription API → subscription_id, user_id, plan_id, start_date, end_date, status, payment_method
    •	Usage API → usage_id, user_id, date, actions_performed, storage_used_mb, active_minutes


In [2]:
# user details
class User(BaseModel):
    user_id: str
    first_name: str
    last_name: str
    email: EmailStr
    signup_date: str
    plan_id: int
    region: str
    referral_source: str

users: List[User] = []
user_id = [fake.uuid4()[:8] for _ in range(1000)]
first_name = [fake.first_name() for _ in range(1000)]
last_name = [fake.last_name() for _ in range(1000)]
email = [fake.email() for _ in range(1000)]
signup_date = [pd.to_datetime(np.random.randint(pd.Timestamp('2024-08-01').value, pd.Timestamp(datetime.now().date()).value)).normalize().strftime('%Y-%m-%d') for _ in range(1000)]
plan_id = [np.random.choice([1, 2, 3, 4, 5], p=[0.33, 0.27, 0.18, 0.16, 0.06]) for _ in range(1000)]
region = [fake.random_element(elements=('North America', 'Europe', 'Asia', 'South America', 'Africa', 'Oceania')) for _ in range(1000)]
referral_source = [np.random.choice(
    ['web search', 'paid ads', 'social media','referral'],
    p=[0.20, 0.45, 0.10, 0.25]
) for _ in range(1000)]

for i in range(1000):
    user = User(
        user_id=user_id[i],
        first_name=first_name[i],
        last_name=last_name[i],
        email=email[i],
        signup_date=signup_date[i],
        plan_id=plan_id[i],
        region=region[i],
        referral_source=referral_source[i]
    )
    users.append(user)

In [3]:
user = pd.DataFrame([user.model_dump() for user in users])
user

Unnamed: 0,user_id,first_name,last_name,email,signup_date,plan_id,region,referral_source
0,de053a60,Aaron,Gray,yspencer@example.org,2024-08-13,1,South America,paid ads
1,bb30326e,Alice,Pearson,stevendavis@example.net,2025-01-03,5,Oceania,social media
2,719e8018,Renee,Scott,rmaldonado@example.net,2025-02-11,5,Asia,paid ads
3,15478867,Jeffrey,Paul,markdunn@example.com,2025-07-20,2,Oceania,paid ads
4,964ef26a,Francisco,Velasquez,vpratt@example.com,2025-02-14,1,Oceania,web search
...,...,...,...,...,...,...,...,...
995,051e83f9,Michael,Watts,stevenhanson@example.net,2025-05-31,1,North America,paid ads
996,f789ee0b,Melissa,Avila,shellypena@example.net,2025-10-02,2,Europe,paid ads
997,b5f0b757,Joshua,Smith,drakeamanda@example.net,2025-01-07,1,Asia,paid ads
998,b585f90f,John,Brown,jeffreycollins@example.org,2024-12-26,5,Africa,paid ads


In [30]:
#  plan details
class Plan(BaseModel):
    plan_id: int
    plan_name: str
    monthly_fee: float
    max_users: str
    features: List[str]
    
plans = [
    Plan(
        plan_id=1,
        plan_name='Free',
        monthly_fee=10,
        max_users='2',
        features=["3 projects limit", "2 users/workspace", "100MB storage", "basic dashboard"]
    ),
    Plan(
        plan_id=2,
        plan_name='Standard',
        monthly_fee=20,
        max_users='10',
        features=["10 projects limit", "10 users/workspace", "5GB storage", "Slack & Google Drive integration"]
    ),
    Plan(
        plan_id=3,
        plan_name='Premium',
        monthly_fee=35,      
        max_users='25',
        features=["unlimited projects", "25 users/workspace", "50GB storage", "task automation", "Zapier & Jira integration", "2FA security"]
    ),
    Plan(
        plan_id=4,
        plan_name='Enterprise',
        monthly_fee=70,
        max_users='unlimited',
        features=["unlimited projects/users", "1TB storage", "account manager", "SSO/SAML", "audit logs", "RBAC", "API access", "99.9% uptime SLA"]
    ),
    Plan(
        plan_id=5,
        plan_name='Ultimate',
        monthly_fee=100,
        max_users='unlimited',
        features=["all enterprise features", "unlimited storage", "AI analytics", "custom integrations", "white labeling", "dedicated engineer", "private deployment"]
    )
]

In [31]:
plan = pd.DataFrame([plan.model_dump() for plan in plans])

In [6]:
from datetime import timedelta

# details for subscription

class Subscription(BaseModel):
    subscription_id: str
    user_id: str
    plan_id: int
    start_date: str
    end_date: str
    payment_method: str
    status: str
    
    
subscriptions: List[Subscription] = []

# Generate initial subscription for each user
for i in range(1000):
    uid = user.user_id[i]
    pid = user.plan_id[i]
    sd = pd.to_datetime(user.signup_date[i])
    
    
    plan = next((plan for plan in plans if plan.plan_id == pid), None)
    
    # Determine payment method and end date based on plan
    if plan.plan_name == "Free":
        pm = 'N/A'
        ed = 'N/A'
        status = 'active'
    else:
        pm = np.random.choice(
            ['credit card', 'paypal', 'bank transfer'],
            p=[0.60, 0.30, 0.10]
        )
        ed = sd + pd.DateOffset(months=1)
    
    # Determine status
    if plan.plan_name != "Free" and ed.strftime('%Y-%m-%d') < pd.to_datetime(datetime.now().date()).strftime('%Y-%m-%d'):
        st = 'expired'
    else:
        st = 'active'
    
    subscription = Subscription(
        subscription_id=fake.uuid4()[:8],
        user_id=uid,
        plan_id=pid,
        start_date=sd.strftime('%Y-%m-%d'),
        end_date=ed if ed == 'N/A' else ed.strftime('%Y-%m-%d'),
        payment_method=pm,
        status=st,
    )
    subscriptions.append(subscription)

# For 20% of users with expired subscriptions, create a renewala/new subscription
expired_user_indices = [i for i, sub in enumerate(subscriptions) if sub.status == 'expired' or sub.plan_id == 1]
users_to_renew = np.random.choice(expired_user_indices, size=int(len(expired_user_indices) * 0.75), replace=False)

for idx in users_to_renew:
    old_subscription = subscriptions[idx]
    
    # New subscription starts 1-7 days after the old one expired
    if old_subscription.end_date == 'N/A':
        new_start = pd.to_datetime(old_subscription.start_date) + pd.DateOffset(days=np.random.randint(30, 40))
    else:
        new_start = pd.to_datetime(old_subscription.end_date) + pd.DateOffset(days=np.random.randint(1, 8))
    
    # 70% chance to keep same plan, 30% chance to upgrade/downgrade
    if np.random.random() < 0.7:
        new_plan_id = old_subscription.plan_id
    else:
        # Choose a different plan
        available_plans = [1, 2, 3, 4, 5]
        available_plans.remove(old_subscription.plan_id)
        new_plan_id = np.random.choice(available_plans)
    
    new_plan = next((plan for plan in plans if plan.plan_id == new_plan_id), None)
    
    # Determine payment method and end date for new subscription
    if new_plan.plan_name == "Free":
        new_pm = 'N/A'
        new_end = 'N/A'
        new_st = 'active'
    else:
        new_pm = np.random.choice(
            ['credit card', 'paypal', 'bank transfer'],
            p=[0.60, 0.30, 0.10]
        )
        new_end = new_start + pd.DateOffset(months=1)
        
        # Determine status for new subscription
        if new_end.strftime('%Y-%m-%d') < pd.to_datetime(datetime.now().date()).strftime('%Y-%m-%d'):
            new_st = 'expired'
        else:
            new_st = 'active'
    
    new_subscription = Subscription(
        subscription_id=fake.uuid4()[:8],
        user_id=old_subscription.user_id,
        plan_id=new_plan_id,
        start_date=new_start.strftime('%Y-%m-%d'),
        end_date=new_end if new_end == 'N/A' else new_end.strftime('%Y-%m-%d'),
        payment_method=new_pm,
        status=new_st
    )
    subscriptions.append(new_subscription)

In [7]:
subscription = pd.DataFrame([subscription.model_dump() for subscription in subscriptions])

In [19]:
class Usage(BaseModel):
    usage_id: str
    user_id: str
    date: str
    actions_performed: int
    storage_used_mb: float
    api_calls: int
    active_minutes: int
    plan_id: int
    
usages: List[Usage] = []

usage_id = [fake.uuid4()[:8] for _ in range(5000)]
user_id = np.random.choice(user.user_id, size=5000)
date = []
actions_performed = [np.random.randint(1, 130) for _ in range(5000)]
storage_used_mb = []
api_calls = []
active_minutes = [np.random.randint(1, 500) for _ in range(5000)]

for i in range(5000):
    # keep the date within the start and end date of subscription (if end date is N/A, consider current date)
    sub = subscription[subscription['user_id'] == user_id[i]].reset_index(drop=True)
    if sub.empty:
        continue
    sub = sub.iloc[-1]  # get the latest subscription
    start_date = pd.to_datetime(sub['start_date'])
    end_date = pd.to_datetime(sub['end_date']) if sub['end_date'] != 'N/A' else pd.to_datetime(datetime.now().date())
    
    # Ensure start_date is before end_date
    if start_date >= end_date:
        # If start_date is after or equal to end_date, set random_date to start_date
        random_date = start_date
    else:
        random_date = pd.to_datetime(np.random.randint(start_date.value, end_date.value)).normalize()
    
    date.append(random_date.strftime('%Y-%m-%d'))
    
    # Use beta distribution for more realistic usage patterns (skewed towards lower values)
    if sub['plan_id'] == 1:
        # Beta distribution with alpha=2, beta=5 gives right-skewed distribution
        storage = round(10 + np.random.beta(2, 5) * 90, 2)
        api = int(10 + np.random.beta(2, 5) * 40)
    elif sub['plan_id'] == 2:
        storage = round(100 + np.random.beta(2, 5) * 4900, 2)
        api = int(50 + np.random.beta(2, 5) * 150)
    elif sub['plan_id'] == 3:
        storage = round(5000 + np.random.beta(2, 5) * 15000, 2)
        api = int(200 + np.random.beta(2, 5) * 150)
    elif sub['plan_id'] == 4:
        storage = round(20000 + np.random.beta(2, 5) * 30000, 2)
        api = int(350 + np.random.beta(2, 5) * 100)
    else:
        storage = round(50000 + np.random.beta(2, 5) * 50000, 2)
        api = int(450 + np.random.beta(2, 5) * 50)
    
    storage_used_mb.append(storage)
    api_calls.append(api)
    
    usage = Usage(
        usage_id=usage_id[i],
        user_id=user_id[i],
        date=random_date.strftime('%Y-%m-%d'),
        actions_performed=actions_performed[i],
        storage_used_mb=storage,
        api_calls=api,
        active_minutes=active_minutes[i],
        plan_id=sub['plan_id']
    )
    usages.append(usage)

In [20]:
usage = pd.DataFrame([usage.model_dump() for usage in usages])

In [21]:
usage

Unnamed: 0,usage_id,user_id,date,actions_performed,storage_used_mb,api_calls,active_minutes,plan_id
0,a303f4b0,0575ee45,2025-01-08,59,91199.99,461,87,5
1,5521f2c6,b6a244e3,2025-04-13,66,22161.50,420,224,4
2,e39f6587,c647ee34,2025-09-24,65,6989.95,229,36,3
3,808e22c4,ea3d0904,2025-05-29,7,29.73,26,190,1
4,b97abea7,491e5212,2025-03-16,58,1284.77,101,271,2
...,...,...,...,...,...,...,...,...
4995,6e46affb,d3374bad,2025-09-27,88,30.69,14,392,1
4996,1eb5d03d,238bf4b3,2024-10-26,70,8668.55,281,273,3
4997,d2d52809,21a38576,2025-10-20,50,70581.70,472,277,5
4998,74d2b91f,acd08193,2025-10-08,111,21.34,12,134,1


In [22]:
# user with max usage means counts
print(usage['user_id'].value_counts().idxmax())
print(usage['user_id'].value_counts().max())


e2425b8b
13


---


In [25]:
user

Unnamed: 0,user_id,first_name,last_name,email,signup_date,plan_id,region,referral_source
0,de053a60,Aaron,Gray,yspencer@example.org,2024-08-13,1,South America,paid ads
1,bb30326e,Alice,Pearson,stevendavis@example.net,2025-01-03,5,Oceania,social media
2,719e8018,Renee,Scott,rmaldonado@example.net,2025-02-11,5,Asia,paid ads
3,15478867,Jeffrey,Paul,markdunn@example.com,2025-07-20,2,Oceania,paid ads
4,964ef26a,Francisco,Velasquez,vpratt@example.com,2025-02-14,1,Oceania,web search
...,...,...,...,...,...,...,...,...
995,051e83f9,Michael,Watts,stevenhanson@example.net,2025-05-31,1,North America,paid ads
996,f789ee0b,Melissa,Avila,shellypena@example.net,2025-10-02,2,Europe,paid ads
997,b5f0b757,Joshua,Smith,drakeamanda@example.net,2025-01-07,1,Asia,paid ads
998,b585f90f,John,Brown,jeffreycollins@example.org,2024-12-26,5,Africa,paid ads


In [26]:
subscription

Unnamed: 0,subscription_id,user_id,plan_id,start_date,end_date,payment_method,status
0,48997eb9,de053a60,1,2024-08-13,,,active
1,cc19b564,bb30326e,5,2025-01-03,2025-02-03,bank transfer,expired
2,dc3ccb83,719e8018,5,2025-02-11,2025-03-11,paypal,expired
3,50fad089,15478867,2,2025-07-20,2025-08-20,paypal,expired
4,6b14de9e,964ef26a,1,2025-02-14,,,active
...,...,...,...,...,...,...,...
1715,775ecc14,73e5e58e,3,2025-06-07,2025-07-07,paypal,expired
1716,54bdee36,0b0a8739,3,2025-03-12,2025-04-12,credit card,expired
1717,047a6455,14cfec75,1,2025-03-31,,,active
1718,59fab281,e7ab1dbd,4,2025-06-14,2025-07-14,credit card,expired


In [27]:
usage

Unnamed: 0,usage_id,user_id,date,actions_performed,storage_used_mb,api_calls,active_minutes,plan_id
0,a303f4b0,0575ee45,2025-01-08,59,91199.99,461,87,5
1,5521f2c6,b6a244e3,2025-04-13,66,22161.50,420,224,4
2,e39f6587,c647ee34,2025-09-24,65,6989.95,229,36,3
3,808e22c4,ea3d0904,2025-05-29,7,29.73,26,190,1
4,b97abea7,491e5212,2025-03-16,58,1284.77,101,271,2
...,...,...,...,...,...,...,...,...
4995,6e46affb,d3374bad,2025-09-27,88,30.69,14,392,1
4996,1eb5d03d,238bf4b3,2024-10-26,70,8668.55,281,273,3
4997,d2d52809,21a38576,2025-10-20,50,70581.70,472,277,5
4998,74d2b91f,acd08193,2025-10-08,111,21.34,12,134,1


In [38]:
plan

Unnamed: 0,plan_id,plan_name,monthly_fee,max_users,features
0,1,Free,10.0,2,"[3 projects limit, 2 users/workspace, 100MB st..."
1,2,Standard,20.0,10,"[10 projects limit, 10 users/workspace, 5GB st..."
2,3,Premium,35.0,25,"[unlimited projects, 25 users/workspace, 50GB ..."
3,4,Enterprise,70.0,unlimited,"[unlimited projects/users, 1TB storage, accoun..."
4,5,Ultimate,100.0,unlimited,"[all enterprise features, unlimited storage, A..."


In [42]:
import dlt
import json

pipeline = dlt.pipeline(
    pipeline_name="dlt_faker_data_pipeline",
    destination='postgres',
    dataset_name='faker_dlt_dataset',
)

# Convert DataFrame to list of dictionaries for better compatibility with dlt
user_data = user.to_dict('records')
plan_data = plan.to_dict('records')

# Convert features list to JSON string for PostgreSQL compatibility
for record in plan_data:
    if 'features' in record and isinstance(record['features'], list):
        record['features'] = json.dumps(record['features'])

subscription_data = subscription.to_dict('records')
usage_data = usage.to_dict('records')

pipeline.run(plan_data, table_name='plan', write_disposition="merge", primary_key=['plan_id'])
pipeline.run(subscription_data, table_name='subscription', write_disposition="merge", primary_key=['subscription_id'])
pipeline.run(usage_data, table_name='usage', write_disposition="merge", primary_key=['usage_id'])
pipeline.run(user_data, table_name='user', write_disposition="merge", primary_key=['user_id'])

LoadInfo(pipeline=<dlt.pipeline(pipeline_name='dlt_faker_data_pipeline', destination='postgres', dataset_name='faker_dlt_dataset', default_schema_name='dlt_faker_data', schema_names=['dlt_faker_data'], first_run=False, dev_mode=False, is_active=True, pipelines_dir='/Users/ayushacharya/.dlt/pipelines', working_dir='/Users/ayushacharya/.dlt/pipelines/dlt_faker_data_pipeline')>, metrics={'1763044135.817195': [{'started_at': DateTime(2025, 11, 13, 14, 28, 55, 880763, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 11, 13, 14, 28, 55, 937837, tzinfo=Timezone('UTC')), 'job_metrics': {'user.daa03c518c.insert_values.gz': LoadJobMetrics(job_id='user.daa03c518c.insert_values.gz', file_path='/Users/ayushacharya/.dlt/pipelines/dlt_faker_data_pipeline/load/normalized/1763044135.817195/started_jobs/user.daa03c518c.0.insert_values.gz', table_name='user', started_at=DateTime(2025, 11, 13, 14, 28, 55, 907682, tzinfo=Timezone('UTC')), finished_at=DateTime(2025, 11, 13, 14, 28, 55, 922592, tzinfo=