<a href="https://colab.research.google.com/github/NINGTANG1124/UPF-HFI/blob/main/notebooks/HFI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# connect to googledrive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd

# === 读取 survey 数据 ===
survey_path = "/content/drive/MyDrive/UPF-HFI/Bradford_original data/4. SurveyMasterfile_clean.xlsx"
df = pd.read_excel(survey_path)

# 1. HFI定义

In [3]:
# Step 1 —— 输入校验与类型标准化
import numpy as np

# 必要列
REQ = ["UserID_clean","insecurity1","insecurity2","insecurity3","insecurity3a","insecurity4","insecurity5"]
missing = [c for c in REQ if c not in df.columns]
assert not missing, f"缺少必要列: {missing}"

# 类型统一（保留 NaN）
hfi_cols = ["insecurity1","insecurity2","insecurity3","insecurity3a","insecurity4","insecurity5"]
df[hfi_cols] = df[hfi_cols].apply(pd.to_numeric, errors="coerce").astype("Int64")

# 编码范围校验
valid_vals = {
    "insecurity1": {1,2,3,4,5},   # HH3
    "insecurity2": {1,2,3,4,5},   # HH4
    "insecurity3": {1,2,3},       # AD1
    "insecurity3a":{1,2,3,4},     # AD1a
    "insecurity4": {1,2,3},       # AD2
    "insecurity5": {1,2,3},       # AD3
}
viol = {}
for c, ok in valid_vals.items():
    bad_mask = df[c].notna() & ~df[c].isin(ok)
    if bad_mask.any():
        viol[c] = sorted(df.loc[bad_mask, c].unique().tolist())

print("超范围编码：", viol if viol else "{}")

# 快速查看每列取值分布（含 NaN）
for c in hfi_cols:
    print(f"\n== {c} ==")
    print(df[c].value_counts(dropna=False))

超范围编码： {}

== insecurity1 ==
insecurity1
3       201
2        97
1        13
5         8
4         5
<NA>      2
Name: count, dtype: Int64

== insecurity2 ==
insecurity2
3       203
2        90
1        18
5         8
<NA>      4
4         3
Name: count, dtype: Int64

== insecurity3 ==
insecurity3
2       244
1        71
3         9
<NA>      2
Name: count, dtype: Int64

== insecurity3a ==
insecurity3a
4       146
<NA>     83
2        40
3        32
1        25
Name: count, dtype: Int64

== insecurity4 ==
insecurity4
2       231
1        78
3        14
<NA>      3
Name: count, dtype: Int64

== insecurity5 ==
insecurity5
2       273
1        39
3        10
<NA>      4
Name: count, dtype: Int64


In [4]:
# Step 2 —— 逐题映射成得分（1/0/NaN）
# 映射规则
map_hh_often_sometimes = {1:1, 2:1, 3:0, 4:np.nan, 5:np.nan}  # HH3/HH4
map_yes_no              = {1:1, 2:0, 3:np.nan}                 # AD1/AD2/AD3
map_ad1a                = {1:1, 2:1, 3:0, 4:np.nan}            # AD1a

# HH3/HH4
df["hh3_score"] = df["insecurity1"].map(map_hh_often_sometimes)
df["hh4_score"] = df["insecurity2"].map(map_hh_often_sometimes)

# AD1/AD2/AD3
df["ad1_score"] = df["insecurity3"].map(map_yes_no)
df["ad2_score"] = df["insecurity4"].map(map_yes_no)
df["ad3_score"] = df["insecurity5"].map(map_yes_no)

# AD1a：只有 AD1=Yes(1) 才映射；AD1=No(0) → 0；AD1缺失 → NaN
df["ad1a_score"] = np.where(
    df["ad1_score"].eq(1),
    df["insecurity3a"].map(map_ad1a),
    np.where(df["ad1_score"].eq(0), 0, np.nan)
).astype(float)

# 检查映射后的取值
score_cols = ["hh3_score","hh4_score","ad1_score","ad1a_score","ad2_score","ad3_score"]
for c in score_cols:
    print(f"\n{c} 取值分布：")
    print(df[c].value_counts(dropna=False))


hh3_score 取值分布：
hh3_score
0.0    201
1.0    110
NaN     15
Name: count, dtype: int64

hh4_score 取值分布：
hh4_score
0.0    203
1.0    108
NaN     15
Name: count, dtype: int64

ad1_score 取值分布：
ad1_score
0.0    244
1.0     71
NaN     11
Name: count, dtype: int64

ad1a_score 取值分布：
ad1a_score
0.0    264
1.0     49
NaN     13
Name: count, dtype: int64

ad2_score 取值分布：
ad2_score
0.0    231
1.0     78
NaN     17
Name: count, dtype: int64

ad3_score 取值分布：
ad3_score
0.0    273
1.0     39
NaN     14
Name: count, dtype: int64


In [5]:
# Step 3 —— 原始总分 + “最少有效作答”规则
score_cols = ["hh3_score","hh4_score","ad1_score","ad1a_score","ad2_score","ad3_score"]

# 1) 有效作答数
df["HFI_valid_items"] = df[score_cols].notna().sum(axis=1)

# 2) 总分（少于5项则返回 NaN）
df["HFI_raw_score"] = df[score_cols].sum(axis=1, min_count=5)

# 3) 合法性：0..6 的整数（或 NaN）
mask = df["HFI_raw_score"].notna()
assert df.loc[mask, "HFI_raw_score"].between(0,6).all(), "HFI_raw_score 越界"
df.loc[mask, "HFI_raw_score"] = df.loc[mask, "HFI_raw_score"].round().astype(int)

# 4) 快速查看
print("有效作答<5 的人数/占比：",
      int((df['HFI_valid_items']<5).sum()),
      (df['HFI_valid_items']<5).mean().round(3))
print(df["HFI_valid_items"].value_counts().sort_index())
print(df["HFI_raw_score"].describe())
print(df["HFI_raw_score"].value_counts(dropna=False).sort_index())

有效作答<5 的人数/占比： 18 0.055
HFI_valid_items
0      2
1      2
2      4
3      3
4      7
5     24
6    284
Name: count, dtype: int64
count    308.000000
mean       1.415584
std        2.069481
min        0.000000
25%        0.000000
50%        0.000000
75%        2.000000
max        6.000000
Name: HFI_raw_score, dtype: float64
HFI_raw_score
0.0    177
1.0     34
2.0     24
3.0     12
4.0     13
5.0     22
6.0     26
NaN     18
Name: count, dtype: int64


In [6]:
# Step 4 —— 三分类（有序）+ 二分类
# Step 4: 分类变量

# 三分类函数
def classify_hfi(score):
    if pd.isna(score):
        return None
    if score <= 1:
        return "Food secure"
    elif 2 <= score <= 4:
        return "Low food security"
    else:
        return "Very low food security"

df["HFI_category"] = df["HFI_raw_score"].apply(classify_hfi)

# 设置有序类别
cat_dtype = pd.api.types.CategoricalDtype(
    categories=["Food secure","Low food security","Very low food security"],
    ordered=True
)
df["HFI_category"] = df["HFI_category"].astype(cat_dtype)

# 二分类（0/1/NaN）
df["HFI_binary"] = df["HFI_raw_score"].apply(
    lambda x: (1 if (pd.notna(x) and x>=2) else (0 if (pd.notna(x) and x<=1) else np.nan))
).astype("Int64")

# 检查分布
print("三分类分布：\n", df["HFI_category"].value_counts(dropna=False))
print("\n二分类分布：\n", df["HFI_binary"].value_counts(dropna=False))

三分类分布：
 HFI_category
Food secure               211
Low food security          49
Very low food security     48
NaN                        18
Name: count, dtype: int64

二分类分布：
 HFI_binary
0       211
1        97
<NA>     18
Name: count, dtype: Int64


In [8]:
# 保存带 HFI 变量的 survey 数据
output_path = "/content/drive/MyDrive/UPF-HFI/Model/outcome/survey_with_HFI.xlsx"
df.to_excel(output_path, index=False)
print(f"已保存到: {output_path}")

已保存到: /content/drive/MyDrive/UPF-HFI/Model/outcome/survey_with_HFI.xlsx
