In [1]:
import numpy as np
import logging
import psycopg2
import psycopg2.extras as p
import requests
from google.cloud import storage
from time import sleep
from datetime import date
from uuid import UUID, uuid4
from dataclasses import dataclass
from faker import Faker
from contextlib import contextmanager

# Set the logging level and format
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

psycopg2.extras.register_uuid()

@dataclass
class Customer:
    id: UUID
    first_name: str
    last_name: str
    email: str
    phone_number: str
    district_code: int
    date_of_birth: date
    gender: str
    created_at: date


@dataclass
class BillingInfo:
    id: str
    customer_id: UUID
    plan_id: int
    billing_date: date
    total_charges: float
    data_charges: float
    roaming_charges: float
    data_usage: float
    sms_count: int


@dataclass
class SubscriptionStatus:
    id: UUID
    customer_id: UUID
    plan_id: int
    start_date: date
    end_date: date


@dataclass
class CallRecord:
    id: UUID
    customer_id: UUID
    call_date: date
    call_duration: float
    call_type: str
    location_id: int

In [1]:
import csv
plans = [
    {
        "id": 1,
        "plan_name": "Basic",
        "plan_type": "Individual",
        "plan_price": 20.99
    },
    {
        "id": 2,
        "plan_name": "Premium",
        "plan_type": "Individual",
        "plan_price": 50.99
    },
    {
        "id": 3,
        "plan_name": "Family",
        "plan_type": "Family",
        "plan_price": 80.99
    },
    {
        "id": 4,
        "plan_name": "Unlimited",
        "plan_type": "Individual",
        "plan_price": 100.99
    },
    {
        "id": 5,
        "plan_name": "Business",
        "plan_type": "Business",
        "plan_price": 200.99
    },
    {
        "id": 6,
        "plan_name": "Enterprise",
        "plan_type": "Business",
        "plan_price": 300.99
    },
    {
        "id": 7,
        "plan_name": "Small Business",
        "plan_type": "Business",
        "plan_price": 150.99
    },
    {
        "id": 8,
        "plan_name": "Large Business",
        "plan_type": "Business",
        "plan_price": 400.99
    },
    {
        "id": 9,
        "plan_name": "Student",
        "plan_type": "Student",
        "plan_price": 15.99
    },
    {
        "id": 10,
        "plan_name": "Senior",
        "plan_type": "Senior",
        "plan_price": 10.99
    },
    {
        "id": 11,
        "plan_name": "International",
        "plan_type": "International",
        "plan_price": 60.99
    },
    {
        "id": 12,
        "plan_name": "Data Only",
        "plan_type": "Data",
        "plan_price": 30.99
    },
    {
        "id": 13,
        "plan_name": "Content Creator",
        "plan_type": "Business",
        "plan_price": 250.99
    },
    {
        "id": 14,
        "plan_name": "Social Media",
        "plan_type": "Social",
        "plan_price": 40.99
    },
    {
        "id": 15,
        "plan_name": "Gaming",
        "plan_type": "Gaming",
        "plan_price": 70.99
    },
    {
        "id": 16,
        "plan_name": "Family Plus",
        "plan_type": "Family",
        "plan_price": 100.99
    }
]

districts = [
    {"id": 1, "name": "Central and Western"},
    {"id": 2, "name": "Eastern"},
    {"id": 3, "name": "Southern"},
    {"id": 4, "name": "Wan Chai"},
    {"id": 5, "name": "Sham Shui Po"},
    {"id": 6, "name": "Kowloon City"},
    {"id": 7, "name": "Kwun Tong"},
    {"id": 8, "name": "Wong Tai Sin"},
    {"id": 9, "name": "Yau Tsim Mong"},
    {"id": 10, "name": "Islands"},
    {"id": 11, "name": "Kwai Tsing"},
    {"id": 12, "name": "North"},
    {"id": 13, "name": "Sai Kung"},
    {"id": 14, "name": "Sha Tin"},
    {"id": 15, "name": "Tai Po"},
    {"id": 16, "name": "Tsuen Wan"},
    {"id": 17, "name": "Tuen Mun"},
    {"id": 18, "name": "Yuen Long"}
]

def write_dict_list_to_csv(file_obj, dict_list):
    """
    Writes a list of dictionaries to a CSV file.

    Args:
        file_obj (file object): The file object where the CSV data will be written.
        dict_list (list): A list of dictionaries, where each dictionary represents a row of data.

    Returns:
        None
    """
    # Create a CSV writer object
    writer = csv.writer(file_obj)

    # Write the header row
    header = dict_list[0].keys()
    writer.writerow(header)

    # Write each row of data
    for row in dict_list:
        writer.writerow(row.values())

# Open a new CSV file in write mode
# with open('plans.csv', mode='w', newline='') as f1:
#     write_dict_list_to_csv(f1, plans)

# Open a new CSV file in write mode
# with open('districts.csv', mode='w', newline='') as f2:
#     write_dict_list_to_csv(f2, districts)


In [3]:
f = Faker()

In [4]:
def _get_customers(cust_ids: list[UUID]) -> list[dict]:
    """
    Create fake customer profile 
    """
    f = Faker()
    return [
        Customer(
            id = cust_id,
            first_name=f.first_name(),
            last_name=f.last_name(),
            email=f'{cust_id.__str__()[:13]}_{f.email()}',
            phone_number=f.bothify(text="+852 #### ####"),
            district_code=np.random.randint(1, 19),
            date_of_birth=f.date_of_birth(),
            gender=np.random.choice(["male", "female"]),
            created_at=f.date_this_decade(before_today=True, after_today=False)
        ).__dict__
        for cust_id in cust_ids
    ]

def _get_subscription_status(custs: list[dict]) -> list[dict]:
    """
    Create fake subscription status for customer
    """
    f = Faker()
    results = []
    for cust in custs:
        end_date = f.date_this_decade(after_today=True, before_today=False)
        results.append(SubscriptionStatus(
                id = uuid4(),
                customer_id = cust["id"],
                plan_id = np.random.randint(1, 17),
                start_date = f.date_between(start_date=cust["created_at"], end_date=end_date),
                end_date = end_date
        ).__dict__)
    return results

def _get_call_records(cust_ids: list[UUID]) -> list[dict]:
    """
    Create fake call reords for customer
    """
    f = Faker()
    call_types = ["Outgoing Call", "Incoming Call",  "Data Usage Inquiry", "Network Issue", "Billing Inquiry", "Technical Support", "Account Update"]
    return [
        CallRecord(
            id=uuid4(),
            customer_id=np.random.choice(cust_ids),
            call_date=f.date_this_month(),
            call_duration=np.random.exponential(10),
            call_type=np.random.choice(call_types),
            location_id=np.random.randint(1, 51)
        ).__dict__
        for _ in range(1000)
    ]


def _get_billing_infos(active_subscriptions: list[dict]) -> list[dict]:
    """
    Create fake billing statements for customer
    """
    f = Faker()
    results = []
    for s in active_subscriptions:
        fake_billing_date = f.date_between(start_date="now", end_date=s["end_date"])
        fake_data_charges = np.random.uniform(0, 100)
        fake_roaming_charges = np.random.uniform(0, 500)
        fake_total_charges = np.random.uniform(fake_data_charges + fake_roaming_charges, 1000)
        results.append(BillingInfo(
                id=str(s["customer_id"]) + '_' + str(s["plan_id"]) + "_" + fake_billing_date.strftime("%m-%Y"),
                customer_id=str(s["customer_id"]),
                plan_id=str(s["plan_id"]),
                billing_date=fake_billing_date.strftime("%Y-%m-%d"),
                total_charges=fake_total_charges,
                data_charges=fake_data_charges,
                roaming_charges=fake_roaming_charges,
                data_usage=np.random.uniform(0, 20),
                sms_count=np.random.randint(0, 50),
            ).__dict__
        )
    return results

def _customer_data_insert_query() -> str:
    return """
    INSERT INTO Customer (
        id,
        first_name,
        last_name,
        email,
        phone_number,
        district_code,
        date_of_birth,
        gender,
        created_at
    )
    VALUES (
        %(id)s,
        %(first_name)s,
        %(last_name)s,
        %(email)s,
        %(phone_number)s,
        %(district_code)s,
        %(date_of_birth)s,
        %(gender)s,
        %(created_at)s
    )
    """

def _subscription_status_insert_query() -> str:
    return """
    INSERT INTO SubscriptionStatus (
        id,
        customer_id,
        plan_id,
        start_date,
        end_date
    )
    VALUES (
        %(id)s,
        %(customer_id)s,
        %(plan_id)s,
        %(start_date)s,
        %(end_date)s
    )
    """


def create_bucket(name: str) -> storage.bucket.Bucket:
    """Create bucket on GCS if it is not exist"""
    storage_client = storage.Client()
    bucket = storage_client.bucket(name)
    if bucket.exists():
        logging.info('Bucket already exists')
    else:
        # Create the new bucket
        bucket = storage_client.create_bucket(name, location='asia-east2')
        logging.info('Bucket {} created'.format(bucket.name))
    return bucket

class DatabaseConnection:
    def __init__(self):
        # DO NOT HARDCODE !!!
        self.conn_url = (
            "postgresql://postgres:password123@localhost:5432/telecomdb"
        )

    @contextmanager
    def managed_cursor(self, cursor_factory=None):
        self.conn = psycopg2.connect(self.conn_url)
        self.conn.autocommit = True
        self.curr = self.conn.cursor(cursor_factory=cursor_factory)
        try:
            yield self.curr
        finally:
            self.curr.close()
            self.conn.close()

In [5]:
def generate_data(iteration: int, calls_bucket: str = "telecom_de") -> None:
    cust_ids = [uuid4() for _ in range(1000)]
    customers = _get_customers(cust_ids)
    subpscription = _get_subscription_status(customers)
    call_records = _get_call_records(cust_ids)
    billing_infos = _get_billing_infos(subpscription)

    # Send call data to GCS
    # Create bucket if not exists
    bucket = create_bucket(calls_bucket)
    # Upload csv file 
    blob = bucket.blob(f"src_calls/data_{str(iteration)}.csv")
    with blob.open("w", newline='') as f:
        write_dict_list_to_csv(f, call_records)
        logging.info(f"CSV file 'src_calls/data_{str(iteration)}.csv' was successfully uploaded to bucket '{calls_bucket}'")
    
    with DatabaseConnection().managed_cursor() as curr:
        # send customers data to telecomdb
        p.execute_batch(curr, _customer_data_insert_query(), customers)
        
        # send subscritions data to telecomdb
        p.execute_batch(curr, _subscription_status_insert_query(), subpscription)

    # send billing data to hosted fast api 
    res = requests.post("http://localhost:8000/billings", json={"data":billing_infos})
    
    logging.info(res.json()["message"])

In [6]:
generate_data(1)

2023-07-24 19:32:18 INFO Bucket already exists
2023-07-24 19:32:18 INFO CSV file 'src_calls/data_1.csv' was successfully uploaded to bucket 'telecom_de'


{'data': [{'id': 'c6234dd9-c4ad-40cd-87d9-4f1422ce095d_7_09-2027', 'customer_id': 'c6234dd9-c4ad-40cd-87d9-4f1422ce095d', 'plan_id': '7', 'billing_date': '2027-09-01', 'total_charges': 392.9756290745926, 'data_charges': 4.030568215878716, 'roaming_charges': 359.0717944304753, 'data_usage': 5.050993408514898, 'sms_count': 39}, {'id': '2144c24a-3cef-4f30-a102-f7d9e234f7af_13_06-2024', 'customer_id': '2144c24a-3cef-4f30-a102-f7d9e234f7af', 'plan_id': '13', 'billing_date': '2024-06-01', 'total_charges': 992.813496680805, 'data_charges': 57.333326244867536, 'roaming_charges': 182.19941408025946, 'data_usage': 15.587497871506118, 'sms_count': 25}, {'id': '8f918739-b7aa-4b04-a4e6-9b818230809e_2_07-2023', 'customer_id': '8f918739-b7aa-4b04-a4e6-9b818230809e', 'plan_id': '2', 'billing_date': '2023-07-29', 'total_charges': 744.6278505121263, 'data_charges': 24.08280893735395, 'roaming_charges': 43.479951321416024, 'data_usage': 11.044828818150556, 'sms_count': 25}, {'id': '3b603cc5-830d-4c20-8ed

2023-07-24 19:32:21 INFO Billing records are successfully inserted


In [7]:
if __name__ == "__main__":
    itr = 1
    while True:
        generate_data(itr)
        sleep(30)
        itr += 1

2023-07-24 19:32:30 INFO Bucket already exists
2023-07-24 19:32:30 INFO CSV file 'src_calls/data_1.csv' was successfully uploaded to bucket 'telecom_de'


{'data': [{'id': '6a1e332d-851d-452a-b827-718ef928c523_11_03-2027', 'customer_id': '6a1e332d-851d-452a-b827-718ef928c523', 'plan_id': '11', 'billing_date': '2027-03-24', 'total_charges': 768.6883884900244, 'data_charges': 22.716282824823885, 'roaming_charges': 402.22981325743496, 'data_usage': 3.69689606096103, 'sms_count': 39}, {'id': 'd7e0b369-8f21-47d7-8d8d-15f41f7b2a68_15_06-2024', 'customer_id': 'd7e0b369-8f21-47d7-8d8d-15f41f7b2a68', 'plan_id': '15', 'billing_date': '2024-06-20', 'total_charges': 480.6020851932085, 'data_charges': 3.223711278104824, 'roaming_charges': 25.018767091960992, 'data_usage': 6.991996015065787, 'sms_count': 8}, {'id': '3749b0b3-5812-4e09-9e57-8a48391513b6_10_08-2024', 'customer_id': '3749b0b3-5812-4e09-9e57-8a48391513b6', 'plan_id': '10', 'billing_date': '2024-08-21', 'total_charges': 670.3208185058131, 'data_charges': 73.42291985114888, 'roaming_charges': 453.0772323756062, 'data_usage': 4.094961745550229, 'sms_count': 9}, {'id': 'a3f9debf-18f0-46f2-b4d

2023-07-24 19:32:33 INFO Billing records are successfully inserted
