In [27]:
from os import getenv
from connect_db import connect_db
con = connect_db(
    username=getenv("ORACLE_USERNAME"),
    password=getenv("ORACLE_PASSWORD"),
    host=getenv("ORACLE_HOST"),
    port=int(getenv("ORACLE_PORT"))
)
cur = con.cursor()

In [45]:
import math
import random
# https://stackoverflow.com/questions/70341989/weighted-random-float-number-with-single-target-and-chance-of-hitting-target

# Linearly interpolate between a and b by t.
def lerp(a, b, t):
    return ((1.0 - t) * a) + (t * b)

def random_float(max_num: float, target: float, strength: float = 1.0) -> float:
    # scale target down to within range of 0 and 1
    target /= max_num

    # Start with a base float between 0 and 1.
    base = random.random()

    # Get another float between 0 and 1, that trends towards 1 with a higher strength value.
    adjust = random.random()
    adjust = 1.0 - math.pow(1.0 - adjust, strength)

    # Lerp the base float towards the target by the adjust amount.
    value = lerp(base, target, adjust)
    
    # scale back value to the entire float range
    return value * max_num 

In [46]:
from typing import NamedTuple, Iterable
from enum import Enum, auto


class BaggageType(Enum):
    CHK = auto()
    CO = auto()

class Baggage(NamedTuple):
    baggage_type: BaggageType
    weight: float
    price: float
    additional_info: str
    
BAGGAGE_TYPE = ["CO", "CHK"]
BAGGAGE_TYPE_WEIGHTS = [5, 6]
def get_random_baggage_type() -> BaggageType:
    return random.sample(BAGGAGE_TYPE, k=1, counts=BAGGAGE_TYPE_WEIGHTS)[0]

def get_random_weight(baggage_type: str) -> float:
    match baggage_type:
        case "CO":
            return random_float(15.0, target=4.5, strength=2.0)
        case "CHK":
            return random_float(45.0, target=10.0, strength=2.0)

CARRY_ON_PRICE_PER_KG = 20
MAX_CARRY_ON_KG = 7
CARRY_ON_PENALTY = 1.2
CHECK_IN_PRICE_PER_KG = 15
MAX_CHECK_IN_KG = 20
CHECK_IN_PENALTY = 1.2
def calculate_price(weight: float, baggage_type: str) -> float:
    match baggage_type:
        case "CO":
            if weight > MAX_CARRY_ON_KG:
                over, weight = divmod(weight, MAX_CARRY_ON_KG)
                over = over * MAX_CARRY_ON_KG
                
                price = weight * CARRY_ON_PRICE_PER_KG
                price += over * CARRY_ON_PRICE_PER_KG * CARRY_ON_PENALTY
            else:
                price = weight * CARRY_ON_PRICE_PER_KG
            return price
        case "CHK":
            if weight > MAX_CHECK_IN_KG:
                over, weight = divmod(weight, MAX_CHECK_IN_KG)
                over = over * MAX_CHECK_IN_KG
                
                price = weight * CHECK_IN_PRICE_PER_KG
                price += over * CHECK_IN_PRICE_PER_KG * CHECK_IN_PENALTY
            else:
                price = weight * CHECK_IN_PRICE_PER_KG
            return price

BAGGAGE_ADDITIONAL_INFOS = ["Fragile", "Combustible", "Contains liquid", None]
BAGGAGE_ADDITIONAL_INFOS_WEIGHTS = [5, 1, 2, 50]
def get_random_additional_info() -> str | None:
    return random.sample(BAGGAGE_ADDITIONAL_INFOS, k=1, counts=BAGGAGE_ADDITIONAL_INFOS_WEIGHTS)[0]

def get_random_number_of_baggages() -> int:
    i = 0
    while random.randint(1, 2**i) == 1:
        i += 1
    return i

def get_random_baggages() -> Iterable[Baggage]:
    for _ in range(get_random_number_of_baggages()):
        baggage_type = get_random_baggage_type()
        weight = get_random_weight(baggage_type)
        yield Baggage(
            baggage_type=baggage_type,
            weight=weight,
            price=calculate_price(weight, baggage_type),
            additional_info=get_random_additional_info()
        )

In [47]:
def calculate_membership_discount(membership_id: str) -> float:
    match membership_id:
        case "GOLD":
            return 0.85
        case "SILV":
            return 0.9
        case "BRNZ":
            return 0.95
        case _:
            return 1
    
stmt = "SELECT ACCOUNT.ACCOUNT_ID, MEMBERSHIP_TYPE_ID FROM MEMBERSHIP JOIN ACCOUNT ON ACCOUNT.ACCOUNT_ID = MEMBERSHIP.ACCOUNT_ID" 
ACCOUNT_MEMBERSHIP: dict[str, str] = {row[0]: row[1] for row in cur.execute(stmt)}

In [48]:
stmt = "SELECT ACCOUNT_ID FROM REFUND JOIN PAYMENT ON REFUND.PAYMENT_ID = PAYMENT.PAYMENT_ID"
REFUNDED_ACCOUNT_ID = set(row[0] for row in cur.execute(stmt))

In [49]:
from datetime import datetime, timedelta

def deviate_time(dt: datetime, min_min: int, max_min: int) -> datetime:
    return dt - timedelta(minutes=random.randint(min_min, max_min), seconds=random.randint(1, 59))

In [50]:
def get_payment_method() -> str:
    return random.choices(("CRCD", "DBCD", "PYPL", "BANK", "CASH"), k=1, weights=[0.3, 0.2, 0.1, 0.2, 0.2])[0]

In [51]:
payment_i = 0
def PAYMENT_ID_GEN():
    global payment_i
    out = f"P{payment_i:08}"
    payment_i += 1
    return out

In [52]:
from utils import paginate_insert_all
from datetime import datetime

with open("3insertBaggage.sql", "w") as baggage_f, \
     open("1insertPayment.sql", "w") as payment_f:
        
    BAGGAGE_SQL = "    INTO BAGGAGE (BAGGAGE_TYPE_ID, PAYMENT_ID, FLIGHT_TICKET_ID, BAGGAGE_WEIGHT_IN_KG, PRICE, ADDITIONAL_INFO) VALUES ('{}', '{}', '{}', {.2f}, {.2f}, {})"
    def insert_baggage(flight_ticket_id: str, payment_id: str, baggage: Baggage):
        if baggage.additional_info:
            additional_info = f"'{baggage.additional_info}'"
        else:
            additional_info = "NULL"
        baggage_f.write(BAGGAGE_SQL.format(baggage.baggage_type.name, payment_id, flight_ticket_id, baggage.weight, baggage.price, additional_info) + '\n')
        
    PAYMENT_SQL = "    INTO PAYMENT (PAYMENT_METHOD_ID, ACCOUNT_ID, CREATED_AT, AMOUNT) VALUES ('{}', {}, TO_TIMESTAMP('{:%Y-%m-%d %H:%M:%S}', 'YYYY-MM-DD HH24:MI:SS'), {:.2f})"
    def insert_payment(payment_method_id: str, account_id: str, timestamp_created: datetime, payment_amount: float):
        payment_f.write(PAYMENT_SQL.format(payment_method_id, account_id, timestamp_created, payment_amount) + '\n')
        
    insert_baggage = paginate_insert_all(f=insert_baggage, fp=baggage_f)
    insert_payment = paginate_insert_all(f=insert_payment, fp=payment_f)
    
    stmt = "SELECT FLIGHT_TICKET.FLIGHT_TICKET_ID, ACCOUNT_ID, DEPARTURE_DATETIME FROM FLIGHT_TICKET JOIN FLIGHT_SEQUENCE ON FLIGHT_TICKET.FLIGHT_TICKET_ID = FLIGHT_SEQUENCE.FLIGHT_TICKET_ID JOIN FLIGHT ON FLIGHT_SEQUENCE.FLIGHT_ID = FLIGHT.FLIGHT_ID"
    for row in cur.execute(stmt):
        account_id = row[1]
        if account_id in REFUNDED_ACCOUNT_ID:
            continue
        
        if random.randint(1, 2) == 1:
            continue
            
        flight_ticket = row[0]
        departure_time = row[2]
        check_in_time = deviate_time(departure_time, 10, 120)
        
        payment_id = PAYMENT_ID_GEN()
        payment_amt: float = 0
        for baggage in get_random_baggages():
            insert_baggage(flight_ticket, payment_id, baggage)
            if account_id in ACCOUNT_MEMBERSHIP:
                payment_amt += baggage.price * calculate_membership_discount(ACCOUNT_MEMBERSHIP[account_id])
            else:
                payment_amt += baggage.price
            
        insert_payment(get_payment_method(), account_id, check_in_time, payment_amt)
            
    baggage_f.write("SELECT 1 FROM DUAL;\n")
    payment_f.write("SELECT 1 FROM DUAL;\n")

AttributeError: 'str' object has no attribute 'name'

In [None]:
con.close()