In [None]:
import os
import sys
import pandas as pd
import numpy as np
import json
import time

In [None]:
stime = time.time()

# db_cache的資料處理，給公務資料使用

In [None]:
# 目錄
folder_src = ['farmer_income_survey', 'resources']
folder_input = folder_src + ['input']
folder_output = folder_src + ['output']
year = '109'
#
folder_raw = folder_input + [year, 'raw']
folder_csv = folder_input + [year, 'csv']
folder_db_cache = folder_input + [year, 'db_cache']
# ====================================================================
# 12個db_cache的csv檔名&存檔路徑
db_cache_list = [
    'household',  # 戶籍檔
    'fallow_declare',  # 休耕轉作申報
    'fallow_transfer_subsidy',  # 休耕轉作補貼
    'small_large_tenant_information_landlord_id',  # 小大地主id
    'small_large_tenant_information_tenant_id',  # 小大承租人id
    'small_large_tenant_transfer',  # 小大轉作補貼
    'small_large_landlord_rent',  # 小大出租給付
    'small_large_landlord_retire',  # 小大離農獎勵
    'disaster_subsidy',  # 天災救助
    'child_scholarship',  # 子女獎助學金
    'livestock',  # 畜牧
    'elder_allowance',  # 老農津貼
]
#
for name in db_cache_list:
    csv_fname = f'db_{name}.csv'
    var_name = f'db_{name}_csv'
    path_name = f'path_db_{name}_csv'
    locals()[var_name] = csv_fname
    locals()[path_name] = os.path.join(*folder_db_cache, csv_fname)

In [None]:
# 讀取csv檔的共同參數
init = {
    # 'engine': 'python',
    'encoding': 'utf-8',
    'sep': ',',
    'keep_default_na': False,  # 放棄自動轉NA
}
# ID欄位的統一字串處理
def SE_upper(se):
    try:
        return se.str.strip().str.replace('[次]', '??').str.replace('[ \u3000　]','').str.upper()  # 去空白，轉大寫
    except:
        print(f'{se.name}: SE_upper有問題，dtype={se.dtype}')
        return se

# 0.調查名冊

In [None]:
csv_survey_path = os.path.join(*folder_csv, '01_調查名冊.csv')
cols = [
    'SEQ', '編號', '層別', '樣本套號', '戶長姓名', '市內電話', '分機', '手機號碼', '地址',
    '總銷售收入組距', '非對象註記', '總處連結編號', 'ID', '連結編號', '主備標記', '縣市', '區鄉鎮名稱',
    'adcode', '農戶編號', '可耕作地面積', '主要經營型態', '戶內15歲以上人口',
    '調查員', '刪除',
    'from', 'tel_u', 'phone_u', 'addr_u', 'row_u', 'tel_new', 'phone_new', 'addr_new', '更新市內', '更新手機', '更新地址'
]
dtype = {col: str for col in cols}
#
df_survey = pd.read_csv(csv_survey_path, **init, dtype=dtype)
#
print(df_survey.shape)
df_survey[:3]

In [None]:
# 調查名冊提供9039個ID(去重複=9027)給內政部
where = True
where &= ~df_survey['ID'].str.contains('ZID')
where &= df_survey['農戶編號'] != ''
#
tmp = df_survey[where]
print('提供=', tmp.shape)
#
ID_survey = set(tmp['ID'])
print('去重=', len(ID_survey))

# 1.戶籍檔處理 (db_household.csv)

## 1-1: 讀內政部原始檔

### ---全戶戶籍檔

In [None]:
# 109/10/22，佩芬提供
folder_raw_household = folder_raw + ['20201022_佩芬_內政部戶籍檔']
csv_hh_path = os.path.join(*folder_raw_household, 'coa_d03_10910_utf8.csv')
# 13欄(不含第14欄換行符號),最後9欄
cols = [
    '表頭', '統號', '出生日期', '戶號', 'addr1', 'addr2', 'addr3', 'addr4', 'addr5',
    '與戶長關係', '個人註記代碼', '遷出類別', '戶別代碼'
]
cols_tar = cols[:4] + ['戶籍地址'] + cols[9:]
dtype = {col: str for col in cols}  # 全以字串讀入
df_hh = pd.read_csv(csv_hh_path, **init, dtype=dtype, usecols=cols)
# 欄位清洗
for col in cols:
    df_hh[col] = SE_upper(df_hh[col])
# 戶籍地址5欄合併
df_hh['戶籍地址'] = df_hh['addr1']+df_hh['addr2']+df_hh['addr3']+df_hh['addr4']+df_hh['addr5']
df_hh = df_hh[cols_tar]
# 過濾
df_hh.drop_duplicates(inplace=True)  # 會有重複的，因為重複給調查名冊的ID
where = ~df_hh['表頭'].isin(['@@', '']) # 最後一行有檔案結束符號
df_hh = df_hh[where]  
#
df_hh

### ---個人戶籍,全戶未轉出

In [None]:
# 讀檔
csv_p_path = os.path.join(*folder_raw_household, 'coa_d02_10910_utf8.csv')  # 個人資料，ID在第二欄
csv_hh_dif_path = os.path.join(*folder_raw_household, 'coa_d03_10910_dif.csv')  # 全戶未轉出，只有一欄ID
#
df_p = pd.read_csv(csv_p_path, **init, header=None)
df_hh_dif = pd.read_csv(csv_hh_dif_path, **init, header=None)
# 清ID欄
df_p[1] = SE_upper(df_p[1])
df_hh_dif[0] = SE_upper(df_hh_dif[0])
#
where1 = ~df_p[0].isin(['@@', ''])
where2 = ~df_hh_dif[0].isin(['@@', ''])
df_p = df_p[where1]
df_hh_dif = df_hh_dif[where2]

In [None]:
ID_p = set(df_p[1])
ID_hh_dif = set(df_hh_dif[0])
ID_hh = set(df_hh['統號'])
# 個人戶籍有8920筆，去重8908
print(df_p.shape, len(ID_p), len(ID_hh_dif), len(ID_hh), len(ID_survey))

In [None]:
# 個人戶籍/未轉出的ID應該出自調查名冊
print(ID_p.issubset(ID_survey), ID_hh_dif.issubset(ID_survey))
# 個人戶籍在全戶籍檔/ 未轉出不在全戶籍檔
print(ID_p.issubset(ID_hh), ID_hh_dif & ID_hh, ID_hh_dif & ID_p)
# 個人戶籍8908+未轉出119=9027
ID_p | ID_hh_dif == ID_survey

### >>> 全戶戶籍檔所有ID，佩芬要去要勞保資料

In [None]:
ID_hh_all = ID_hh | ID_hh_dif
df_id_hh_all = pd.DataFrame(ID_hh_all).rename(columns={0: 'ID'}).sort_values(by=['ID']).reset_index(drop=True)
#
df_id_hh_all

In [None]:
xlsx_id_hh_all_path = os.path.join(*folder_raw_household, f'{year}_全戶戶籍檔ID(資料檔+未轉出={df_id_hh_all.shape[0]}).xlsx')
if not os.path.isfile(xlsx_id_hh_all_path):
    df_id_hh_all.to_excel(xlsx_id_hh_all_path, index=None)
    print(f'存{xlsx_id_hh_all_path}')

## 1-2: 三個欄位代碼轉文字 (insert_to_household_statment.sql)

In [None]:
# 個人註記代碼
df_hh['個人註記代碼'] = df_hh['個人註記代碼'].str.replace('^0$', '現住人口').str.replace('^1$', '死亡').str.replace('^2$', '除口')
# 遷出類別，有空字串(最多)跟0，但不知道意義
df_hh['遷出類別'] = df_hh['遷出類別'].str.replace('^1$', '國內').str.replace('^2$', '國外').str.replace('^3$', '喪失國籍').str.replace('^4$', '喪失(臺灣地區人民)身分')
# 戶別代碼
df_hh['戶別代碼'] = df_hh['戶別代碼'].str.replace('^1$', '共同生活戶').str.replace('^2$', '共同事業戶').str.replace('^3$', '單獨生活戶')
#
df_hh[:3]

## 1-3: 欄名轉換，存 db_household.csv

In [None]:
# 取最後中介檔所需欄位
cols_src = ['統號', '出生日期', '戶號', '戶籍地址', '與戶長關係', '個人註記代碼']
cols_tar = ['pid', 'birth', 'householdNumber', 'address', 'role', 'annotation']
cols_map = dict(zip(cols_src, cols_tar))
#
df_hh = df_hh.rename(columns=cols_map)[cols_tar]
# 存CSV
if not os.path.isfile(path_db_household_csv):
    df_hh.to_csv(path_db_household_csv, index=None, header=True)
    print(f'存{path_db_household_csv}')
#
df_hh[:3]

## 1-4: 與DB比對

In [None]:
path_db_household_csv_DB = os.path.join(*folder_db_cache, 'db_household_DB.csv')
dtype = {col: str for col in cols_tar}  # 全以字串讀入
df_hh_db = pd.read_csv(path_db_household_csv_DB, **init, dtype=dtype)
# 欄位清洗
for col in cols_tar:
    df_hh_db[col] = SE_upper(df_hh_db[col])
#
df_hh_db

In [None]:
df_hh = df_hh.replace('\?', '', regex=True)
df_hh_db = df_hh_db.replace('\?', '', regex=True).replace('除戶', '除口')
# 逐欄排序
df_hh = df_hh.sort_values(by='pid').reset_index(drop=True)
df_hh_db = df_hh_db.sort_values(by='pid').reset_index(drop=True)

In [None]:
# 兩個df直接比對
if not df_hh.equals(df_hh_db):
    # 逐欄比對
    for col in df_hh.columns:
        if not df_hh[[col]].equals(df_hh_db[[col]]):
            print(col)
else:
    print('戶籍檔完全一樣')

In [None]:
time.time() - stime

# 2.老農津貼 (db_elder_allowance.csv)

## 2-1: 讀檔 (今年公務資料用去年整年的)
資料裡的身份證字號那欄有被換置過, 2, 3 碼要互換 8, 9 碼互換

In [None]:
folder_raw_EA = folder_raw + ['20201026_老農津貼']
csv_EA_path = os.path.join(*folder_raw_EA, '10811核付資料.csv')
cols_map = {'身分證號': 'pid', '核付金額': 'amount'}
df_EA = pd.read_csv(csv_EA_path, **init, usecols=cols_map.keys()).rename(columns=cols_map)
# 清pid欄位
df_EA['pid'] = SE_upper(df_EA['pid'])
# 只挑戶籍檔的ID
where = True
where &= df_EA['pid'].isin(ID_hh)  # ID_hh_all 不必再加119筆，就算碰得到，也不會顯示在excel
df_EA = df_EA[where].sort_values('pid').reset_index(drop=True)
# 108年不用互換。107年: 2,3碼要互換，8,9碼互換 > 10位切七段，23段交換，56段交換
# df_EA['pid'] = df_EA['pid'].str.replace('^(.)(.)(.)(....)(.)(.)(.)$', r'\1\3\2\4\6\5\7')
#
df_EA

In [None]:
# 存CSV
if not os.path.isfile(path_db_elder_allowance_csv):
    df_EA.to_csv(path_db_elder_allowance_csv, index=None, header=True)  
    print(f'存{path_db_elder_allowance_csv}')

## 2-2: 與DB比對

In [None]:
path_db_elder_allowance_csv_DB = os.path.join(*folder_db_cache, 'db_elder_allowance_DB.csv')
df_EA_db = pd.read_csv(path_db_elder_allowance_csv_DB, **init)
df_EA_db['pid'] = SE_upper(df_EA_db['pid'])
#
print(df_EA_db.shape)
df_EA_db[:3]

In [None]:
if not df_EA.equals(df_EA_db):
    # 逐欄比對
    for col in df_EA.columns:
        if not df_EA[[col]].equals(df_EA_db[[col]]):
            print(col)
else:
    print('老農津貼完全一樣')

# 3.畜牧 (db_livestock.csv)

## 3-1: 讀檔

In [None]:
folder_raw_LS = folder_raw + ['20201027_畜牧']
csv_LS_path_2020 = os.path.join(*folder_raw_LS, '主力農家所得-2020.csv')  # 2020(Q1234 + M5)+2019(M5,M11)
dtype = {'FieldId': str}
df_LS_2020 = pd.read_csv(csv_LS_path_2020, **init, dtype=dtype)
# 清欄位
df_LS_2020['FarmerId'] = SE_upper(df_LS_2020['FarmerId'])
df_LS_2020['InvSeason'] = SE_upper(df_LS_2020['InvSeason'])
df_LS_2020['FieldName'] = SE_upper(df_LS_2020['FieldName'])
df_LS_2020['NAME'] = SE_upper(df_LS_2020['NAME'])
df_LS_2020['FieldId'] = SE_upper(df_LS_2020['FieldId'])
# 2020全要+2019_M11 / 只留ID_hh __________________________________________________________
where = True
where &= df_LS_2020['InvYear'] == 2020
where |= (df_LS_2020['InvYear'] == 2019) & (df_LS_2020['InvSeason'] == 'M11')
# 補從冠喻撈出來的劉永昌ID
extra_ID = {"T102811151"}
where &= df_LS_2020['FarmerId'].isin(ID_hh | extra_ID)
df_LS = df_LS_2020[where]
# 調查年分轉民國年 ========================================================================
df_LS['InvYear'] = (df_LS['InvYear'] - 1911).astype(str)
# 針對乳牛/羊排序
df_LS['nameOrder'] = np.where(df_LS['NAME'] == '泌乳牛', 0, 9)
df_LS['nameOrder'] = np.where(df_LS['NAME'] == '乾乳牛', 1, df_LS['nameOrder'])
df_LS['nameOrder'] = np.where(df_LS['NAME'] == '未產女牛', 2, df_LS['nameOrder'])
df_LS['nameOrder'] = np.where(df_LS['NAME'] == '泌乳羊', 3, df_LS['nameOrder'])
df_LS['nameOrder'] = np.where(df_LS['NAME'] == '乾乳羊', 4, df_LS['nameOrder'])
df_LS['nameOrder'] = np.where(df_LS['NAME'] == '未產女羊', 5, df_LS['nameOrder'])
# 10個欄位轉換
cols_src = [
    'FarmerId', 'InvYear', 'InvSeason', 'FieldName',
    'nameOrder',
    'NAME',
    'RaiseCount', 'SlaughterCount', 'MilkCount', 'AntlerCount', 'EggCount',
    'FieldId',
]
cols_tar = [
    'farmerId', 'investigateYear', 'investigateSeason', 'fieldName',
    'nameOrder',
    'animalName',
    'raiseCount', 'slaughterCount', 'milkCount', 'antlerCount', 'eggCount',
    'fieldId',
]
cols_by = [
    'farmerId', 'fieldName',
    # 'fieldId', # 同一畜牧場可能有多個id，不可用來分組
    'investigateYear', 'investigateSeason',
    'nameOrder',
    'animalName',
]
#
cols_map = dict(zip(cols_src, cols_tar))
# 整理
df_LS = df_LS.rename(columns=cols_map)[cols_tar].sort_values(by=cols_by).reset_index(drop=True)
df_LS

In [None]:
# 專門處理今年的畜牧場id，同名畜牧場可能有多個fieldId
tmp = df_LS[['farmerId', 'fieldName', 'fieldId']]
tmp['fieldId'] = tmp['fieldId'].map(lambda x: [x])
tmp = tmp.groupby(by=['farmerId', 'fieldName'], as_index=False).sum()
tmp['fieldId'] = tmp['fieldId'].map(lambda x: '_'.join(sorted(list(set(x)))))
field_split = tmp['fieldId'].str.split(pat='_', expand=True).fillna('')
n = field_split.shape[1]
tmp = pd.concat([tmp, field_split], axis=1).drop(columns=['fieldId'])
tmp.rename(columns={i: f'fieldId_{i}' for i in range(n)}, inplace=True)
#
tmp

## 3-2: 加總

In [None]:
df_LS = df_LS.groupby(by=cols_by, as_index=False).sum().reset_index(drop=True).round({'eggCount': 3})
#
df_LS = df_LS.merge(tmp, on=['farmerId', 'fieldName'], how='left').sort_values(by=cols_by)
# nameOrder換成兩種父品項名稱
where = df_LS['animalName'].isin(['泌乳牛', '乾乳牛', '未產女牛'])
df_LS['nameOrder'] = np.where(where, '乳牛', '')
where = df_LS['animalName'].isin(['泌乳羊', '乾乳羊', '未產女羊'])
df_LS['nameOrder'] = np.where(where, '乳羊', df_LS['nameOrder'])
#
df_LS

## 3-3: 讀取父品項【乳牛】【乳羊】的屠宰量

In [None]:
csv_LS_path_2020_0205 = os.path.join(*folder_raw_LS, '主力農家所得-2020_乳牛羊.csv')
dtype = {'FieldId': str}
usecols = ['InvSeason', 'FieldId', 'FieldName', 'Name', 'SlaughterCount']
df_LS_2020_0205 = pd.read_csv(csv_LS_path_2020_0205, **init, dtype=dtype, usecols=usecols)
#
df_LS_2020_0205['FieldId'] = SE_upper(df_LS_2020_0205['FieldId'])
df_LS_2020_0205['FieldName'] = SE_upper(df_LS_2020_0205['FieldName'])
df_LS_2020_0205['Name'] = SE_upper(df_LS_2020_0205['Name'])
#
df_LS_2020_0205

In [None]:
right_on = ['InvSeason', 'FieldId', 'FieldName', 'Name']
df_LS['SC'] = 0
# 將父品項屠宰量累積到SC
for i in range(n):
    left_on = ['investigateSeason', f'fieldId_{i}', 'fieldName', 'nameOrder']
    df_LS = df_LS.merge(df_LS_2020_0205, left_on=left_on, right_on=right_on, how='left').drop(columns=right_on+['Name'])
    df_LS['SC'] = df_LS['SC'] + df_LS['SlaughterCount'].fillna(0).astype(int)
    df_LS.drop(columns=[f'fieldId_{i}', 'SlaughterCount'], inplace=True)
# 針對三種乳牛羊的子品項，替換父品項屠宰量 __________________________________________________
where = df_LS['animalName'].isin(['泌乳牛', '泌乳羊'])
where &= df_LS['slaughterCount'] == 0
where &= df_LS['SC'] > 0
df_LS['slaughterCount'] = np.where(where, df_LS['SC'], df_LS['slaughterCount'])
# 泌乳之外的，顯示空白
where = df_LS['animalName'].isin(['乾乳牛', '未產女牛', '乾乳羊', '未產女羊'])
where &= df_LS['slaughterCount'] == 0
df_LS['slaughterCount'] = np.where(where, '', df_LS['slaughterCount'])
#
cols_tar.remove('nameOrder')
cols_tar.remove('fieldId')
df_LS = df_LS.drop(columns=['nameOrder', 'SC'])[cols_tar]
#
df_LS

In [None]:
# 存CSV
if not os.path.isfile(path_db_livestock_csv):
    df_LS.to_csv(path_db_livestock_csv, index=None, header=True)  
    print(f'存{path_db_livestock_csv}')

## 3-4: 與DB比對

In [None]:
path_db_livestock_csv_DB = os.path.join(*folder_db_cache, 'db_livestock_DB.csv')
dtype = {'investigateYear': str}
df_LS_db = pd.read_csv(path_db_livestock_csv_DB, **init, dtype=dtype).round({'eggCount': 3})
#
df_LS_db['farmerId'] = SE_upper(df_LS_db['farmerId'])  # API的原始資料有小寫
df_LS_db['fieldName'] = SE_upper(df_LS_db['fieldName'])  # API的原始資料有空格
#
df_LS_db = df_LS_db.sort_values(by=cols_tar).reset_index(drop=True)
#
df_LS_db

In [None]:
# 去除劉永昌，恢復乳牛羊的屠宰量為0
where = df_LS['farmerId'].isin(extra_ID)
tmp = df_LS[~where].sort_values(by=cols_tar).reset_index(drop=True)
#
where = tmp['animalName'].str.contains("泌乳|乾乳|未產女")
tmp['slaughterCount'] = np.where(where, 0, tmp['slaughterCount']).astype('int64')
#
tmp

In [None]:
if not tmp.equals(df_LS_db):
    print('df不一樣，可能欄位順序或dtype不一樣')
    # 逐欄比對
    for col in tmp.columns:
        if not tmp[[col]].equals(df_LS_db[[col]]):
            print(col)
else:
    print('畜牧完全一樣')

## 3-5: 檢查【畜牧場名稱】有無學校 & 乳品項的屠宰量

In [None]:
where = df_LS['fieldName'].str.contains('大學|高中')
df_LS[where].fieldName.value_counts()

In [None]:
# 檢查 乳 的畜牧品項，且屠宰量>0
where = df_LS_2020.NAME.str.contains('乳')
tmp = df_LS_2020[where]
where = tmp.SlaughterCount > 0
tmp2 = tmp[where]
#
tmp2.NAME.value_counts()

In [None]:
# 碰完戶籍檔，【乳XXX】屠宰量大於0的畜牧場
where = df_LS.animalName.isin(['泌乳牛'])
tmp = df_LS[where]
where = tmp.slaughterCount.isin([0,''])
tmp2 = tmp[~where]
#
tmp2#.fieldName.value_counts()

In [None]:
time.time() - stime