In [2]:
from dagster import op, job
import pandas as pd
import numpy as np
import os
import re
from dotenv import load_dotenv
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy import create_engine, MetaData, Table, inspect, text
import datetime
import logging

# ✅ Load environment variables
load_dotenv()


True

In [3]:

# ✅ DB source (MariaDB) - เพิ่ม timeout และ connection pool
source_engine = create_engine(
    f"mysql+pymysql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/fininsurance",
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600,
    connect_args={
        'connect_timeout': 60,
        'read_timeout': 300,
        'write_timeout': 300
    }
)
source_engine_task = create_engine(
    f"mysql+pymysql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/fininsurance_task",
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600,
    connect_args={
        'connect_timeout': 60,
        'read_timeout': 300,
        'write_timeout': 300
    }
)

# ✅ DB target (PostgreSQL) - เพิ่ม timeout และ connection pool
target_engine = create_engine(
    f"postgresql+psycopg2://{os.getenv('DB_USER_test')}:{os.getenv('DB_PASSWORD_test')}@{os.getenv('DB_HOST_test')}:{os.getenv('DB_PORT_test')}/fininsurance",
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600,
    connect_args={
        'connect_timeout': 60,
        'options': '-c statement_timeout=300000'  # 5 minutes timeout
    }
)


In [4]:

# ✅ เพิ่มประสิทธิภาพ: ใช้ LIMIT และปรับปรุง query
df_plan = pd.read_sql("""
    SELECT quo_num, type_insure, update_at, id_government_officer, status_gpf, quo_num_old,
            status AS status_fssp, type_car, chanel_key  
    FROM fin_system_select_plan 
    WHERE update_at BETWEEN '2025-01-01' AND '2025-08-06'
        AND type_insure IN ('ประกันรถ', 'ตรอ')
    ORDER BY update_at DESC
""", source_engine)

df_plan

Unnamed: 0,quo_num,type_insure,update_at,id_government_officer,status_gpf,quo_num_old,status_fssp,type_car,chanel_key
0,FQ2508-04780,ประกันรถ,2025-08-05 23:35:43,,no,,wait,,APP-B2C
1,FQ2508-04779,ประกันรถ,2025-08-05 23:29:08,,no,,wait,,APP-B2C
2,FQ2507-23832,ประกันรถ,2025-08-05 23:24:08,สิทธิ์กบข.,no,FQ2408-04026,active,,APP-B2C
3,FQ2508-04563,ประกันรถ,2025-08-05 23:22:53,,no,,active,,B2B
4,FQ2508-04774,ประกันรถ,2025-08-05 23:07:33,,no,,wait,,APP-B2C
...,...,...,...,...,...,...,...,...,...
531529,FV2501-00102,ตรอ,2025-01-01 08:35:30,,,,active,,WEB-VIF
531530,FV2501-00101,ตรอ,2025-01-01 08:25:17,,,,,,WEB-VIF
531531,FQ2501-00223,ประกันรถ,2025-01-01 08:20:06,,no,,active,,B2B
531532,FQ2409-25327,ประกันรถ,2025-01-01 08:08:34,,no,,active,,TELE


In [5]:
# ✅ ดึงเฉพาะข้อมูลที่จำเป็นจาก fin_order และเพิ่ม LIMIT
df_order = pd.read_sql("""
    SELECT quo_num, order_number, chanel, datekey, status AS status_fo
    FROM fin_order
    WHERE quo_num IS NOT NULL
""", source_engine_task)

df_order

Unnamed: 0,quo_num,order_number,chanel,datekey,status_fo
0,FQ20-01-0013-1,FIN1912-00009,Line,2019-12-06 16:43:39,7
1,FQ1912-0070,FIN1912-00035,web นายหน้า,2019-12-13 17:56:50,8
2,FQ1912-0071,FIN1912-00037,web นายหน้า,2019-12-13 18:17:48,6
3,FQ1912-0088,FIN1912-00042,web นายหน้า,2019-12-17 12:29:27,0
4,FQ1912-0101,FIN1912-00043,web นายหน้า,2019-12-18 11:38:58,8
...,...,...,...,...,...
1622223,FV2508-08068,FAV2508-02027,web ตรอ API,2025-08-06 14:27:50,8
1622224,FV2508-08070,FAV2508-02028,web ตรอ API,2025-08-06 14:28:02,8
1622225,FV2508-08044,FAV2508-02029,web ตรอ API,2025-08-06 14:28:06,8
1622226,FV2508-08005,FAV2508-02030,web ตรอ API,2025-08-06 14:28:40,8


In [6]:
df_pay = pd.read_sql("""
    SELECT quo_num, update_at, numpay, show_price_ins, show_price_prb, show_price_total,
            show_price_check, show_price_service, show_price_taxcar, show_price_fine,
            show_price_addon, show_price_payment, distax, show_ems_price, show_discount_ins,
            discount_mkt, discount_government, discount_government_fin,
            discount_government_ins, coupon_addon, status AS status_fsp, id_cus
    FROM fin_system_pay 
    WHERE update_at BETWEEN '2025-01-01' AND '2025-08-06'
        AND type_insure IN ('ประกันรถ', 'ตรอ')
    ORDER BY update_at DESC
""", source_engine)

df_pay

Unnamed: 0,quo_num,update_at,numpay,show_price_ins,show_price_prb,show_price_total,show_price_check,show_price_service,show_price_taxcar,show_price_fine,...,distax,show_ems_price,show_discount_ins,discount_mkt,discount_government,discount_government_fin,discount_government_ins,coupon_addon,status_fsp,id_cus
0,FQ2508-04780,2025-08-05 23:37:06,6,6600,645.21,7245.21,,,,,...,0,40,826.51,,,,,0,sendpay,FNG25-138267
1,FQ2503-07376,2025-08-05 23:33:19,6,5800,0,5800.00,,,,,...,53.29,40,0,,0,0.00,0.00,40,success-waitinstall,FNG20-0264
2,FQ2508-04779,2025-08-05 23:29:08,,,,,,,,,...,,,0,,,,,,wait,FNG25-138267
3,FQ2508-04776,2025-08-05 23:13:18,,,,,,,,,...,,,0,,,,,,wait,FNG21-31803
4,FQ2408-14004,2025-08-05 23:10:02,10,19000,0,19000.00,,,,,...,58.06,35,0,,0,0.00,0.00,100,success-waitinstall,FNG21-14153
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1123329,FQ2412-23129,2025-01-01 06:53:21,6,16675.95,0,16675.95,,,,,...,3.88,35,38.73,,0,0.00,0.00,0,success-waitinstall,FNG22-069783
1123330,FQ2412-32420,2025-01-01 06:53:21,10,5800,0,5800.00,,,,,...,41.84,35,759.98,,0,0.00,0.00,0,success-waitinstall,FNG22-082303
1123331,FQ2411-00530,2025-01-01 02:12:08,3,5899.98,0,5899.98,,,,,...,54.92,35,0,,0,0.00,0.00,0,success-waitinstall,FNG21-46597
1123332,FQ2411-06114,2025-01-01 00:49:07,6,6800,0,6800.00,,,,,...,37.98,35,,,0,0.00,0.00,0,success-waitinstall,FNG22-069416


In [7]:
df_risk =pd.read_sql("""
    SELECT quo_num, type 
    FROM fin_detail_plan_risk  
    WHERE type = 'คอนโด'
""", source_engine)

df_risk

Unnamed: 0,quo_num,type
0,FN2110-02035,คอนโด
1,FN2110-02303,คอนโด
2,FN2111-02632,คอนโด
3,FN2111-03006,คอนโด
4,FN2201-03377,คอนโด
...,...,...
57,FN2507-03861,คอนโด
58,FN2508-00163,คอนโด
59,FN2508-00176,คอนโด
60,FN2508-00448,คอนโด


In [11]:
df_pa =pd.read_sql("""
    SELECT quo_num, special_package
    FROM fin_detail_plan_pa   
    WHERE special_package= 'CHILD'
""", source_engine)

df_pa

Unnamed: 0,quo_num,special_package
0,FN2408-00102,CHILD
1,FN2408-00103,CHILD
2,FN2408-00104,CHILD
3,FN2408-00105,CHILD
4,FN2408-00111,CHILD
...,...,...
19792,FN2508-00698,CHILD
19793,FN2508-00700,CHILD
19794,FN2508-00701,CHILD
19795,FN2508-00706,CHILD


In [12]:
df_health =pd.read_sql("""
    SELECT quo_num, special_package 
    FROM fin_detail_plan_health   
    WHERE special_package = 'CHILD'
""", source_engine)

df_health

Unnamed: 0,quo_num,special_package
0,FN2403-00173,CHILD
1,FN2403-00411,CHILD
2,FN2403-00599,CHILD
3,FN2403-00750,CHILD
4,FN2403-00904,CHILD
...,...,...
4466,FN2508-00697,CHILD
4467,FN2508-00702,CHILD
4468,FN2508-00703,CHILD
4469,FN2508-00704,CHILD


In [23]:
df_wp = pd.read_sql("""
    SELECT cuscode as id_cus, display_permission
    FROM wp_users 
    where display_permission IN ('สำนักงานฟิน', 'หน้าร้านฟิน')
""", source_engine)

df_wp

Unnamed: 0,id_cus,display_permission
0,FNG19-0069,สำนักงานฟิน
1,FNG20-0067,สำนักงานฟิน
2,FNG20-0089,หน้าร้านฟิน
3,FNG20-0107,สำนักงานฟิน
4,FNG20-0200,สำนักงานฟิน
...,...,...
1814,FNG25-140369,สำนักงานฟิน
1815,FNG25-140374,หน้าร้านฟิน
1816,FNG25-140380,หน้าร้านฟิน
1817,FNG25-140383,หน้าร้านฟิน


In [None]:
# 🔄 Merge ตามลำดับ เริ่มจาก df_plan เป็นหลัก
df_merged = df_plan.copy()

# Merge ทีละตารางด้วย 'quo_num'
df_merged = df_merged.merge(df_order, on='quo_num', how='left', suffixes=('', '_order'))
df_merged = df_merged.merge(df_pay, on='quo_num', how='left', suffixes=('', '_pay'))
df_merged = df_merged.merge(df_risk, on='quo_num', how='left', suffixes=('', '_risk'))
df_merged = df_merged.merge(df_pa, on='quo_num', how='left', suffixes=('', '_pa'))
df_merged = df_merged.merge(df_health, on='quo_num', how='left', suffixes=('', '_health'))

# ถ้ามีแล้วสามารถ merge ได้เลย
df_merged = df_merged.merge(df_wp, on='id_cus', how='left', suffixes=('', '_wp'))
df_merged

Unnamed: 0,quo_num,type_insure,update_at,id_government_officer,status_gpf,quo_num_old,status_fssp,type_car,chanel_key,order_number,...,discount_mkt,discount_government,discount_government_fin,discount_government_ins,coupon_addon,status_fsp,id_cus,type,special_package,special_package_health
0,FQ2508-04780,ประกันรถ,2025-08-05 23:35:43,,no,,wait,,APP-B2C,,...,,,,,0,sendpay,FNG25-138267,,,
1,FQ2508-04779,ประกันรถ,2025-08-05 23:29:08,,no,,wait,,APP-B2C,,...,,,,,,wait,FNG25-138267,,,
2,FQ2507-23832,ประกันรถ,2025-08-05 23:24:08,สิทธิ์กบข.,no,FQ2408-04026,active,,APP-B2C,FAM2508-00878,...,,,,,,,,,,
3,FQ2508-04563,ประกันรถ,2025-08-05 23:22:53,,no,,active,,B2B,FIN2508-01307,...,,,,,,,,,,
4,FQ2508-04774,ประกันรถ,2025-08-05 23:07:33,,no,,wait,,APP-B2C,,...,,,,,,wait,FNG24-130207,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
531572,FV2501-00102,ตรอ,2025-01-01 08:35:30,,,,active,,WEB-VIF,VIN2501-00101,...,,,,,,success,FNG22-055946,,,
531573,FV2501-00101,ตรอ,2025-01-01 08:25:17,,,,,,WEB-VIF,,...,,,,,,wait,Admin-VIF,,,
531574,FQ2501-00223,ประกันรถ,2025-01-01 08:20:06,,no,,active,,B2B,FIN2501-00410,...,,0,0.00,0.00,0,success-waitinstall,FNG22-064602,,,
531575,FQ2409-25327,ประกันรถ,2025-01-01 08:08:34,,no,,active,,TELE,FIN2409-09375,...,,,,,,,,,,


In [22]:
# ถ้ามีแล้วสามารถ merge ได้เลย
df_merged = df_merged.merge(df_wp, on='id_cus', how='left', suffixes=('', '_wp'))
df_merged

Unnamed: 0,quo_num,type_insure,update_at,id_government_officer,status_gpf,quo_num_old,status_fssp,type_car,chanel_key,order_number,...,discount_government,discount_government_fin,discount_government_ins,coupon_addon,status_fsp,id_cus,type,special_package,special_package_health,display_permission
0,FQ2508-04780,ประกันรถ,2025-08-05 23:35:43,,no,,wait,,APP-B2C,,...,,,,0,sendpay,FNG25-138267,,,,
1,FQ2508-04779,ประกันรถ,2025-08-05 23:29:08,,no,,wait,,APP-B2C,,...,,,,,wait,FNG25-138267,,,,
2,FQ2507-23832,ประกันรถ,2025-08-05 23:24:08,สิทธิ์กบข.,no,FQ2408-04026,active,,APP-B2C,FAM2508-00878,...,,,,,,,,,,
3,FQ2508-04563,ประกันรถ,2025-08-05 23:22:53,,no,,active,,B2B,FIN2508-01307,...,,,,,,,,,,
4,FQ2508-04774,ประกันรถ,2025-08-05 23:07:33,,no,,wait,,APP-B2C,,...,,,,,wait,FNG24-130207,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
531572,FV2501-00102,ตรอ,2025-01-01 08:35:30,,,,active,,WEB-VIF,VIN2501-00101,...,,,,,success,FNG22-055946,,,,
531573,FV2501-00101,ตรอ,2025-01-01 08:25:17,,,,,,WEB-VIF,,...,,,,,wait,Admin-VIF,,,,
531574,FQ2501-00223,ประกันรถ,2025-01-01 08:20:06,,no,,active,,B2B,FIN2501-00410,...,0,0.00,0.00,0,success-waitinstall,FNG22-064602,,,,
531575,FQ2409-25327,ประกันรถ,2025-01-01 08:08:34,,no,,active,,TELE,FIN2409-09375,...,,,,,,,,,,


In [None]:
🔍 Policy register analysis:
   - Total records in policy_register: 1208
   - Unique cuscode in policy_register: 1208
   - Career values: career
พนักงานบริษัท/เอกชน           391
ตัวแทนประกันชีวิต/วินาศภัย    330
เจ้าของธุรกิจ                 307
นายหน้าอิสระ/สังกัดบริษัท      89
Sale ขายรถ/ไฟแนนซ์/ธนาคาร      33
อู่ซ่อมรถ                      23
หน้าร้าน/สำนักงาน              20
ตรอ                             5
MLM/MGM                         3
Name: count, dtype: int64
🔍 Cuscode relationship analysis:
   - Unique cuscode in wp_users: 140478
   - Unique cuscode in policy_register: 1208
   - Cuscode in both tables: 1179
   - Cuscode only in wp_users: 139299
   - Cuscode only in policy_register: 29
🔍 Sample cuscode only in wp_users:
             cuscode                    name
33759    FNG21-33815          ญาณิศา กังน้อย
38772    FNG21-38830         ธฤษวรรณ ยิ้มศรี
109289  FNG24-110956         ปรารถนา เจตชุ่ม
122055  FNG24-122676  สิริย์ภัค วีระกิจพานิช
135329  FNG25-135752   จันทร์ทิมา ชวเลิศสกุล
📦 df_main: (140478, 20)
📦 df_career: (1208, 2)
📦 df_merged: (140478, 21)
🔍 Career data analysis:
   - Total records: 140478
   - Records with career: 1173
   - Records without career (NaN): 139305
   - Percentage with career: 0.84%
🔍 Sample records without career:
        cuscode                   name
0    FIN0000000            ปวีณา ชมสวน
1  FNG1911-0037                  ปัญญา
2  FNG1912-0003  ณัฏฐา รังษีกุลพิพัฒน์
3  FNG1912-0004                ปัญญากร
4  FNG1912-0005     เจษฎาพงษ์  ชาติศรี
🔍 Sample records with career:
         cuscode                 name                      career
129   FNG20-0054    สุปราณี รัชรินทร์   นายหน้าอิสระ/สังกัดบริษัท
490   FNG20-0419        อรทัย บุญอิ่ม  ตัวแทนประกันชีวิต/วินาศภัย
2642  FNG20-2574  สุภัทรพงษ์ ทิพย์ทอง   นายหน้าอิสระ/สังกัดบริษัท
3851  FNG20-3842         คมกริช สมรูป   นายหน้าอิสระ/สังกัดบริษัท
4470  FNG20-4463    ปิยวรรณ กล่ำกลิ่น  ตัวแทนประกันชีวิต/วินาศภัย
🔍 Cleaning career data...
🔍 Career values after cleaning:
career
nan                           139299
พนักงานบริษัท/เอกชน              368
ตัวแทนประกันชีวิต/วินาศภัย       327
เจ้าของธุรกิจ                    307
นายหน้าอิสระ/สังกัดบริษัท         88
Sale ขายรถ/ไฟแนนซ์/ธนาคาร         33
อู่ซ่อมรถ                         23
หน้าร้าน/สำนักงาน                 19
None                               6
ตรอ                                5
Name: count, dtype: int64
✅ Extracted logs: (140478, 21)

📊 Cleaning completed
✅ Cleaned columns: Index(['agent_id', 'agent_name', 'agent_rank', 'hire_date', 'type_agent',
       'agent_email', 'store_name', 'subdistrict', 'district', 'province',
       'current_province', 'current_area', 'zipcode', 'mobile_number',
       'date_active', 'job', 'agent_region', 'agent_main_region',
       'defect_status', 'is_experienced', 'agent_address'],
      dtype='object')
