# 01_data_preprocessing

이 노트북은 원본 CSV(앱 사용/설문/수면)를 uid×week 단위로 병합해 분석 테이블을 생성합니다. 설문 점수(PHQ-9/GAD-7/Stress) 산출, 카테고리 피벗, 수면 지표 집계, 결측 대치, 초→시간 파생 컬럼 생성 후 저장합니다. (데이터 비공개)

셀 1 — 설정/임포트/경로

In [5]:
# TL;DR: CSV 로드 → 점수화/피벗/집계/조인/결측대치 → *_hours 생성 → Parquet 저장
import sys, os
!{sys.executable} -m pip install -U pyspark packaging setuptools pandas pyarrow

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import (
    col, avg, split, expr, sum as spark_sum
)
import ast
import pathlib

# 입력 경로 (로컬 환경에 맞게 조정)
APP_USAGE_CSV    = "./../all_data/filtering_complete_app_usage.csv"
RESPONSE_CSV     = "./../all_data/response_week_mapping_adjusted.csv"
SLEEP_CSV        = "./../all_data/sleep_week_mapped.csv"
SLEEP_DIARY_CSV  = "./../all_data/sleep_diary_week_mapped.csv"

# 출력 경로1
pathlib.Path("results/tables").mkdir(parents=True, exist_ok=True)
PARQUET_OUT_DIR = "results/tables/processed_weekly"  # ← 확장자 없이 폴더명
SAMPLE_OUT  = "results/tables/processed_weekly_sample.csv"




셀 2 — Spark 세션

In [14]:
# 새 세션 생성
spark = SparkSession.builder \
    .appName("AppUsageMentalHealth-Preprocess") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

셀 3 — 데이터 로드(+타입)

In [25]:
app_usage_df    = (spark.read.option("header", True).csv(APP_USAGE_CSV)
                   .withColumn("duration", col("duration").cast("double")))
raw_response_df = spark.read.option("header", True).csv(RESPONSE_CSV)
sleep_df        = spark.read.option("header", True).csv(SLEEP_CSV)
sleep_diary_df  = spark.read.option("header", True).csv(SLEEP_DIARY_CSV)

셀 4 — 설문 점수 함수

In [27]:
def calc_score(answer_list, weight_list):
    if not isinstance(answer_list, list):
        return 0
    for i, ans in enumerate(answer_list):
        if ans:
            return weight_list[i]
    return 0

def parse_questionnaire(q_list_str, weight_list, filter_options=None):
    total = 0
    try:
        q_list = ast.literal_eval(q_list_str)
        for q in q_list:
            if filter_options and q.get('options') != filter_options:
                continue
            total += calc_score(q.get('answers'), weight_list)
    except Exception:
        pass
    return total


셀 5 — 설문 점수 DataFrame 생성 (RDD→DF)

In [28]:
response_rdd = raw_response_df.rdd.map(lambda r: Row(
    uid = r['uid'],
    week = int(r['week']),
    PHQ9_score = parse_questionnaire(r['PHQ-9'], [0,1,2,3]),
    GAD7_score = parse_questionnaire(r['GAD-7'], [0,1,2,3]),
    Stress_score = parse_questionnaire(
        r['Stress Questionnaire'], [0,1,2,3,4],
        filter_options=['전혀 그렇지 않다','약간 그렇다','웬만큼그렇다','상당히그렇다','아주 그렇다'])
))
response_df = spark.createDataFrame(response_rdd).dropDuplicates(["uid","week"])
response_df.orderBy("uid","week").show(5, truncate=False)


                                                                                

+----------------------------+----+----------+----------+------------+
|uid                         |week|PHQ9_score|GAD7_score|Stress_score|
+----------------------------+----+----------+----------+------------+
|05U1A5bmcnUUycZ6SYAdqhuu3ck2|0   |1         |0         |2           |
|05U1A5bmcnUUycZ6SYAdqhuu3ck2|1   |0         |0         |0           |
|05U1A5bmcnUUycZ6SYAdqhuu3ck2|2   |1         |0         |2           |
|05U1A5bmcnUUycZ6SYAdqhuu3ck2|3   |0         |0         |0           |
|05U1A5bmcnUUycZ6SYAdqhuu3ck2|4   |0         |0         |2           |
+----------------------------+----+----------+----------+------------+
only showing top 5 rows


셀 6 — 앱 사용 집계 → 카테고리 피벗

In [35]:
# 키 중복 방지
app_usage_df = app_usage_df.dropDuplicates(["uid","week","category"])

app_summary_df = (app_usage_df
                  .groupBy("uid","week","category")
                  .agg(spark_sum("duration").alias("duration")))

app_pivot_df = (app_summary_df
                .groupBy("uid","week")
                .pivot("category")
                .sum("duration")
                .na.fill(0.0))


셀 7 — 수면 집계 (HH:MM:SS → 분 변환 사용)

In [36]:
# meanConfidence 평균
sleep_agg_df = (sleep_df
    .filter(col("week").isNotNull())
    .withColumn("meanConfidence", col("meanConfidence").cast("double"))
    .groupBy("uid","week")
    .agg(avg("meanConfidence").alias("mean_confidence_sleep"))
).dropDuplicates(["uid","week"])

# 수면 다이어리: "HH:MM:SS" → 분
sleep_diary_min = (sleep_diary_df
    .filter(col("week").isNotNull())
    .withColumn("parts", split(col("midawake_duration"), ":"))
    .withColumn("midawake_duration_min",
                col("parts").getItem(0).cast("double")*60 +
                col("parts").getItem(1).cast("double") +
                col("parts").getItem(2).cast("double")/60.0)
)
sleep_diary_agg_df = (sleep_diary_min
    .groupBy("uid","week")
    .agg(avg("midawake_duration_min").alias("midawake_duration_sleep"))
).dropDuplicates(["uid","week"])


In [None]:
# week 타입이 데이터프레임마다 다른 경우가 발생하여 spark가 STRING '8.0' → BIGINT 로 casting 실패
# -> 모든 df의 week int로 정규화하여 해결
for name, df in [("app_pivot", app_pivot_df),
                 ("response", response_df),
                 ("sleep", sleep_agg_df),
                 ("sleep_diary", sleep_diary_agg_df)]:
    print(name, df.schema["week"].dataType)
    df.select("week").limit(5).show()


app_pivot StringType()
+----+
|week|
+----+
| 8.0|
| 7.0|
| 6.0|
| 1.0|
| 4.0|
+----+

response IntegerType()
+----+
|week|
+----+
|   2|
|   0|
|   8|
|   4|
|   6|
+----+

sleep StringType()
+----+
|week|
+----+
| 8.0|
| 7.0|
| 6.0|
| 1.0|
| 4.0|
+----+

sleep_diary StringType()
+----+
|week|
+----+
| 5.0|
| 2.0|
| 1.0|
| 7.0|
| 8.0|
+----+



셀 8 — 조인 & 결측 대치 → *_hours 생성 → 저장

In [38]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import IntegerType

def normalize_week_int(df):
    # 1) 문자열/실수 혼재 대비: 문자열로 바꿨다가
    df = df.withColumn("week_str__", col("week").cast("string"))
    # 2) 흔한 패턴 '.0' 제거 (예: '8.0' -> '8')
    df = df.withColumn("week_str__", regexp_replace(col("week_str__"), r"\.0$", ""))
    # 3) 최종 int 캐스팅 (실패 시 NULL -> 조인에서 자연히 걸러지거나 필요시 fill)
    df = df.withColumn("week", col("week_str__").cast(IntegerType())).drop("week_str__")
    return df

# ⬇⬇⬇ 조인 전에 반드시 실행
app_pivot_df     = normalize_week_int(app_pivot_df)
response_df      = normalize_week_int(response_df)
sleep_agg_df     = normalize_week_int(sleep_agg_df)
sleep_diary_agg_df = normalize_week_int(sleep_diary_agg_df)


In [39]:
# 조인
final_df = (app_pivot_df
            .join(response_df, ["uid","week"], "inner")
            .join(sleep_agg_df, ["uid","week"], "left")
            .join(sleep_diary_agg_df, ["uid","week"], "left"))

# 결측 평균 대치
mean_conf = final_df.select(avg("mean_confidence_sleep")).first()[0]
mean_mid  = final_df.select(avg("midawake_duration_sleep")).first()[0]
final_df = final_df.na.fill({
    "mean_confidence_sleep": float(mean_conf) if mean_conf is not None else 0.0,
    "midawake_duration_sleep": float(mean_mid) if mean_mid is not None else 0.0
})

# 1) 점(.) 들어간 컬럼을 전부 언더바(_)로 변경
for c in final_df.columns:
    if "." in c:
        final_df = final_df.withColumnRenamed(c, c.replace(".", "_"))

# 2) 초→시간 파생
from pyspark.sql.functions import col

sec_cols = [c for c in final_df.columns if c.startswith("AppCategory_")]
for c in sec_cols:
    final_df = final_df.withColumn(f"{c}_hours", col(c) / 3600.0)

# uid, week 중복 제거
final_df = final_df.dropDuplicates(["uid","week"])

# 저장
final_df.write.mode("overwrite").parquet(PARQUET_OUT_DIR)
final_df.limit(50).toPandas().to_csv(SAMPLE_OUT, index=False)

print("✅ Saved:", os.path.abspath(PARQUET_OUT_DIR))

                                                                                

✅ Saved: /home/biot/github/AppUsageMentalHealthAnalysis/notebooks/results/tables/processed_weekly
