### 1. 추출한 피처들을 컬럼으로 하는 df만들기

_컬럼 선정이유와 컬럼에 대한 간단한 소개, 형이나 데이터 형식 설명 _추가__(ppt자료)

_펀드명도 익명화되어서 그런건지 다시 한번 알아보고싶다._

In [0]:
%pip install cudf-cu12 rmm-cu12 --extra-index-url=https://pypi.nvidia.com

In [0]:
#  방법 1. gpu 사용을 위해 파이썬 재시작
# %restart_python

In [0]:
# 방법 2. (Databricks 전용)
dbutils.library.restartPython()


In [0]:
import cudf
df = cudf.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
print(df)


In [0]:
!nvidia-smi

In [0]:
# 1) CSV 파일 로드
file_path = "/mnt/raw-data/12.금융상품정보/공모펀드상품.csv"
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-8") \
    .load(file_path)

# 2) 사용할 피처 리스트 정의
selected_cols = [
    # 1. 펀드 식별 및 기본 정보
    "펀드코드", "펀드명", "대표펀드유무", "운용사명", "설정일",
    # 2. 펀드 유형 및 전략
    "대유형", "중유형", "소유형", "투자전략", "펀드키워드",
    # 3. 성과(수익률)
    "펀드성과정보_1년", "유형평균유형성과_1년",
    # 4. 위험/리스크 지표
    "펀드표준편차(연환산)_1년", "펀드수정샤프(연환산)_1년", "MaximumDrawDown_1년",
    # 5. 자금흐름(인기도/트렌드)
    "NetCashFlow(펀드자금흐름)_1년",
    # 6. 규모
    "순자산",
    # 7. 수수료
    "운용보수", "판매보수", "선취수수료", "후취수수료",
    # 8. 테마/특징(필터링용)
    "가치주", "성장주", "중소형주", "글로벌", "자산배분",
    "4차산업", "ESG(사회책임투자형)", "배당주", "FoFs",
    "퇴직연금", "고난도금융상품", "절대수익추구", "레버리지", "퀀트",
    "1년종합등급","1년종합점수","3년종합등급","3년종합점수","5년종합등급","5년종합점수","투자위험등급","판매위험등급"
]

# 3) 선택한 피처만 뽑아서 새로운 DataFrame 생성
df_selected = df.select(*selected_cols)

# 4) 결과 확인
df_selected.printSchema()
df_selected.show(10, truncate=False)


In [0]:
display(df_selected.head(10))

In [0]:
display(df_selected.tail(10))

In [0]:
df_selected.columns

In [0]:
print(len(df_selected.columns))

In [0]:
df_selected.printSchema()

#### 결측치 처리 방안#1
  1. 전체 약 2.4만개의 행 중 랭킹을 메길때 중요한 피처인 펀드 성과정보_1년의 null값을 가진 행은 2754개로 전체의 약 10%로 즉, 적은 비중을 차지 -> 정확한 랭킹을 위해 결측치 제거

### 각 column 마다 Null 값 개수 확인

In [0]:
from pyspark.sql.functions import count, when, isnull
is_null = df_selected.select([count(when(isnull(c), c)).alias(c) for c in df_selected.columns])

display(is_null)

In [0]:
df_selected.count() # 전체 행의 개수

_이정도 %의 결측치는 삭제한다는 예시와 펀드 성과정보의 결측치를 삭제해야한다는 이유 정리_

In [0]:
from pyspark.sql.functions import col

df.filter(col("펀드성과정보_1년").isNull()).count() # 정확히 특정 컬럼 널 값 재확인

In [0]:
df_dropFund=df_selected.na.drop(subset=["펀드성과정보_1년"])

In [0]:
# 원본 행 개수
orig_count = df_selected.count()

# 널 제거 후 행 개수
drop_count = df_dropFund.count()

print(f"원본 행 개수: {orig_count}")
print(f"널 제거 후 행 개수: {drop_count}")
print(f"삭제된 행 수: {orig_count - drop_count}")


In [0]:
from pyspark.sql.functions import col
from functools import reduce
import operator

# 1) 전체 행 수
total_rows = df_dropFund.count()

# 2) 어느 칼럼이라도 널인 행을 찾는 조건 생성
null_condition = reduce(operator.or_, [col(c).isNull() for c in df_dropFund.columns])

# 3) 널이 하나라도 있는 행 수 계산
null_rows = df_dropFund.filter(null_condition).count()

print(f"전체 행 수: {total_rows}")
print(f"널값이 있는 행 수: {null_rows}")


한글 불러오기 설정

In [0]:
import matplotlib.font_manager as fm

for font in fm.findSystemFonts(fontpaths=None, fontext='ttf'):
    if 'Nanum' in font or 'Malgun' in font or 'Gothic' in font or 'Batang' in font:
        print(font)

In [0]:
import matplotlib.pyplot as plt

In [0]:
# 1. 폰트 경로
font_path = "/usr/share/fonts/truetype/nanum/NanumGothic.ttf"

# 2. FontProperties 객체 생성
font_prop = fm.FontProperties(fname=font_path)

# 3. 폰트를 각 그래프 요소에 직접 적용
plt.figure(figsize=(5, 3))
plt.plot([1, 2, 3], [3, 2, 1])
plt.title("한글 제목 테스트", fontproperties=font_prop)
plt.xlabel("X축", fontproperties=font_prop)
plt.ylabel("Y축", fontproperties=font_prop)
plt.show()

### 리스크 탐지를 위한 주요 피처들의 남은 널 값 확인
#### 주요 피처는 다음과 같다
##### 랭킹 피처
    1. 펀드성과정보_1년
    2. 운용보수
    3. NetCashFlow(펀드자금흐름)_1년

##### 리스크 피처
    4. 펀드표준편차(연환산)_1년
    5. 펀드수정샤프(연환산)_1년
    6. MaximumDrawDown_1년
* 여기서 "운용보수" 항목은 널 값이 없으므로 제외


In [0]:
from pyspark.sql.functions import col
from functools import reduce
import operator

# 1) 검사할 컬럼 리스트
numeric_cols = [
    "펀드표준편차(연환산)_1년",
    "펀드수정샤프(연환산)_1년",
    "MaximumDrawDown_1년",
    "NetCashFlow(펀드자금흐름)_1년"
]

# 2) 나머지 네 개 컬럼 중 하나라도 널인 조건 생성
null_any = reduce(operator.or_, [col(c).isNull() for c in numeric_cols])

# 3) 펀드성과정보_1년은 널이 아닌 조건과 결합
condition = col("펀드성과정보_1년").isNotNull() & null_any

# 4) 필터 후 카운트
remaining_null_rows = df_dropFund.filter(condition).count()

print(f"펀드성과정보_1년이 존재하고, 나머지 중 하나라도 널인 행 수: {remaining_null_rows}")


#### 널값을 하나라도 가진 컬럼 확인

*

In [0]:
from pyspark.sql.functions import col, sum as _sum, when
import builtins

# 1) 모든 컬럼 이름 리스트
all_cols = df_dropFund.columns

# 2) 각 컬럼별 null 개수 집계식 생성
null_count_exprs = [
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in all_cols
]

# 3) 집계 실행: 한 행(Row)로 결과가 나옴
null_counts_row = df_dropFund.select(*null_count_exprs).first().asDict()

# 4) 컬럼별 null 개수 출력
print("컬럼별 null 개수:")
for c, cnt in null_counts_row.items():
    print(f"  {c}: {cnt}")

# 5) null을 하나라도 가진 컬럼 개수 계산
num_cols_with_null = builtins.sum(1 for cnt in null_counts_row.values() if cnt > 0)
print(f"\nnull을 하나라도 가진 컬럼 개수: {num_cols_with_null}개")

_근데 이상치를 먼저 자르면 그냥 다 평균값 쓰면 되려나 아무튼 이상치는 컬럼 보고 이상치를 잘라야 하는지 둬야하는지를 좀 고민해보기_

_세 컬럼에서 이상치 시각화  나머지 이상치가 있어서 중앙값, 샤프는 이상치가 좀 적은 경향이라 주로 평균을 사용했다(대유형 그룹핑 기준)_

### 결측치 처리방안#2
#### 아래는 남은 널값을 가진 피처들과 처리방안이다.
| 항목명                                 | 데이터 타입  | 처리 방식                 |
|--------------------------------------|-------------|--------------------------|
| 대표펀드유무                          | string (boolean으로 대체요망)     | 'N'으로 대체 |
| 투자전략                              | string      | pass (변경 없음)         |
| 펀드키워드                            | string      | pass (변경 없음)         |
| 유형평균유형성과_1년                  | double      | 결측치 삭제         |
| 펀드표준편차(연환산)_1년              | double      | 중앙값으로 대체           |
| 펀드수정샤프(연환산)_1년              | double      | 유형별 평균으로 대체       |
| MaximumDrawDown_1년                  | double      | 중앙값으로 대체           |
| NetCashFlow(펀드자금흐름)_1년         | double      | 0으로 대체               |



### 결측치 처리 방안에 대한 근거
| 처리 대상     | 대체 방식  | 정당성 근거        |
| --------- | ------ | ---------------- |
| 표준편차, MDD | 중앙값    | 이상치 방지, 변동성 지표   |
| 자금흐름      | 0 대체   | “비활성 펀드” 간주      |
| 샤프지수  | 유형별 평균 | 동질 펀드 간 성과 비교 보정 |


#### ✅ 펀드자금흐름을 0으로 대체한 이유
1. 자금 유입/유출 흐름이 없거나 데이터 미제공인 경우, 실무에서는 **‘자금 변동 없음’**의 의미로 해석하고 0으로 처리함.

2. 투자자 인식 관점에서도 **0은 “활성화되지 않은 펀드”**로 받아들여짐. 

3. 0이 의미있는 default value로 해석 가능한 경우, 결측 대체로 널리 사용됨.

##### 📚 실제사례

한국펀드평가 & 제로인
* 공시 데이터에서 펀드자금흐름이 비공시된 경우 0으로 대체하고 “비활성” 상태로 분류

#### ✅ 펀드수정샤프(연환산)_1년 → 펀드 대유형별 평균으로 대체한 이유
1. 샤프지수나 IR은 **펀드 유형(예: 주식형, 채권형, 혼합형)**에 따라 편차가 매우 큼.
//펀드 유형에 따른 샤프지수 시각화

2. 같은 유형의 평균을 사용하는 것은 비교 타당성을 확보하면서 보완이 가능. //이름 같은것도 조금 생각해볼만 함

3. 실제 펀드평가사 내부 분석 알고리즘 및 퀀트 투자모델에서 널리 사용하는 방식.

##### 📚 실제사례

KIS Value (한국투자증권 자산운용 평가 시스템)
* 유형별 평균 수익률/리스크 지표를 기준으로 결측값을 보완하여 모델 포트폴리오 구성

#### ✅ 펀드표준편차(연환산)_1년, MaximumDrawDown_1년 → 중앙값(median)대체 이유
1. 표준편차와 MDD는 극단값(이상치)의 영향을 많이 받는 분포(positive skewed distribution)가 흔함.

2. 중앙값은 평균보다 이상치의 영향을 적게 받기 때문에 일반적으로 리스크 지표의 결측 보완에 가장 안정적.

📚 실제사례

한국펀드평가(KFR) 내부 가이드라인
* "이상치 제거 후의 중간값을 활용한 내부 벤치마킹은 퀀트 분석 및 샘플 모델링 시 일반적 접근임."

## 즉 다음과 같은 효과를 얻을 것이라 예상
1. 중앙값 계산 (approxQuantile)
  > 널을 자동으로 제외하고 실제 관측치 분포에서 중앙값을 뽑아내므로, 널이 있어도 왜곡 없이 중앙값을 산출.

2. 단일값 대체 (fillna)
  > 표준편차·MDD 는 중앙값, 자금흐름은 0 으로 일괄 채워서 이후 스케일링·모델링 시 널로 인한 에러를 방지.

3. 그룹별 평균 대체
  > 대유형별로 분포 차이가 있는 펀드수정샤프만 해당 그룹의 평균으로 채우도록 해 일관성을 유지.

4. 범주형 대체
  > 대표펀드유무 는 결측→‘N’ 으로 채워, 나중에 One-Hot 등 인코딩 시 오류를 없앨 수 있다.

#### 먼저 중앙값 계산시 널 자동제외가 되지만, 결측이 무작위인지 확인해보자
1. 결측이 랜덤(random missing): 단순 누락·전산 오류 등이 원인이라면, 관측치만으로 뽑은 중앙값이 전체 분포의 중앙값을 잘 대변함.

2. 결측이 편향적(non-random missing): 예를 들어, 리스크가 큰 극단치 구간에서만 결측이 발생했다면, 중앙값이 과소(혹은 과대) 추정될 수 있음.

* 즉, 2번인 결측의 편향성을 알아보고자 함

In [0]:
from pyspark.sql.functions import col

# 결측 비율 확인
total = df_dropFund.count()
missing_std = df_dropFund.filter(col("펀드표준편차(연환산)_1년").isNull()).count()
print(f"펀드표준편차 널 비율: {missing_std/total:.2%}")


##### 결론
1. 결측 비율이 작고(예: 1–5% 미만), 그리고 랜덤한 누락이라면 중앙값 편향 걱정은 거의 없음.

2. 결측 비율이 크거나(예: 20% 이상), 특정 구간에 몰려 있다면, 더 면밀한 검토(예: 결측 패턴 시각화, Missing-not-at-random 진단 등)가 필요.

> 따라서, 널 비율은 1%미만이므로 편향되진 않음을 확인

_면밀한 검토에 대한 각 컬럼 _시각화__

#### 중앙값이 크게 달라지는지 직접 비교해 보기
명시적으로 널을 제거한 뒤와, 라이브러리 기본 처리(널 자동 제외) 결과가 동일한지 확인

In [0]:
from pyspark.sql.functions import col

# 1) 널을 자동 제외한 중앙값
median_auto = df_dropFund.approxQuantile("펀드표준편차(연환산)_1년", [0.5], 0.01)[0]

# 2) 널을 명시적으로 제외한 DataFrame → 중앙값
median_explicit = (
    df_dropFund
      .filter(col("펀드표준편차(연환산)_1년").isNotNull())
      .approxQuantile("펀드표준편차(연환산)_1년", [0.5], 0.01)[0]
)

print(f"automatic: {median_auto}, explicit: {median_explicit}")


##### 결론
> 두 값이 같기에, 널 자동 제외 과정이 정상적으로 작동해 해석에 차이가 없다를 의미.

### 실제 결측치 처리mean_sharp 임시 칼럼

#### mean_sharp 임시 칼럼 작동원리 설명!
1. grp_mean_sharp 을 조인

2. when(...isNull(), global_mean_sharp).otherwise(mean_sharp) 로 널 그룹 평균 보정

3. 이를 다시 펀드수정샤프(연환산)_1년 널인 행에만 적용.

4. 나머지 결측 처리

    1. 중앙값 및 0 대체 (fillna) → 그룹 평균 임퓨테이션(비어있는 값들을 채우는 것) → 범주형 ‘N’ 대체 순서로 처리

    2. 모든 주요 피처의 널이 제거된 df_clean 을 얻는다!


_이거 최종코드가 나온 1차: 부호정리, 2차: 대분류의 부재 를 발표에 넣으면 좋겠습니다._

_그리고 대유형의 특성을 따른다는 논리적 자료가 더 보충 필요_

In [0]:
from pyspark.sql.functions import col, sum, count

# '대유형'별로 그룹화 후 널값 개수 계산
null_counts = df_dropFund.groupBy("대유형").agg(
    sum(col("펀드수정샤프(연환산)_1년").isNull().cast("int")).alias("null_count"),  # 널값 개수
    count("*").alias("total_count")  # 전체 행 수 (옵션)
).orderBy("대유형")  # 정렬 (옵션)

# 결과 출력 (truncate=False: 생략 없이 전체 출력)
null_counts.show(truncate=False)

_“국내미분류”가 다른 국내 유형과 유사하다는 도메인적 근거가 있다면, 전체 평균보다 국내 그룹핑이 더 합리적임._

_아니면 샤프정보가 표준편차와 수익에 큰 영향을 받는 컬럼인만큼  랜덤포레스트, KNN, 회귀 등 모델에 다른 변수(예: 펀드 크기, 설정일, 운용사 등)와의 관계까지 반영해 결측치 예측 가능.다만 모델 방식은 정보수에 큰 영향을 받는 만큼 국내 미분류에 적용하기에는 31개라는 수치가 음... 좀 효과가 별로 일수도 있다는 생각이 들고, 오히려 다른 대유형에 적용해보면 좋을 것같습니다._

In [0]:
from pyspark.sql.functions import col, when, avg, expr, lit
from pyspark.sql import DataFrame

# 최종 수정본 - 대유형별로 그룹핑시 샤프값이 존재하지 않는 경우인 31개의 행이 존재함 이를 중앙값을 이용해 해결

# ─────────────────────────────────────────────
# 0) 초기 df_dropFund: '펀드성과정보_1년'만 제거된 상태
# ─────────────────────────────────────────────

# 1) '펀드성과정보_1년' 결측(Null) 행 제거 why? 모델 학습의 Target 값(성능 평가 지표)이므로 결측은 버림 처리
df_clean: DataFrame = df_dropFund.filter(col("펀드성과정보_1년").isNotNull())

# 2) 그룹별 통계: 대유형별 median/std, median/mdd, mean/sharp
group_stats = (
    df_clean
      .groupBy("대유형")
      .agg(
          expr("percentile_approx(`펀드표준편차(연환산)_1년`, 0.5)").alias("med_std"),
          expr("percentile_approx(`MaximumDrawDown_1년`, 0.5)").alias("med_mdd"),
          avg("펀드수정샤프(연환산)_1년").alias("mean_sharp")
      )
)

# 3) 전체 샤프 중앙값 계산 (fallback)
# 즉, 샤프 비율 전체 데이터의 중앙값을 구해 mean_sharp가 없는 경우 사용할 최종 fallback 값으로 저장.
global_med_sharp = df_clean.approxQuantile(
    "펀드수정샤프(연환산)_1년", [0.5], 0.01
)[0]

# 4) 그룹별 impute + 남은 샤프 결측은 global_med_sharp으로
df_imputed = (
    df_clean
      .join(group_stats, on="대유형", how="left")
      # 4-1) 표준편차 결측 → 그룹 median
      .withColumn(
          "펀드표준편차(연환산)_1년",
          when(
            col("펀드표준편차(연환산)_1년").isNull(),
            col("med_std")
          ).otherwise(col("펀드표준편차(연환산)_1년"))
      )
      # 4-2) MDD 결측 → 그룹 median
      .withColumn(
          "MaximumDrawDown_1년",
          when(col("MaximumDrawDown_1년").isNull(), col("med_mdd"))
           .otherwise(col("MaximumDrawDown_1년"))
      )
      # 4-3) 샤프 결측 → 그룹 mean, 여전히 널이면 global median
      .withColumn(
          "펀드수정샤프(연환산)_1년",
          when(col("펀드수정샤프(연환산)_1년").isNull(),
               when(col("mean_sharp").isNotNull(), col("mean_sharp"))
               .otherwise(lit(global_med_sharp))
          ).otherwise(col("펀드수정샤프(연환산)_1년"))
      )
      # 4-4) 불필요 임시 칼럼 제거 -> 분석에만 사용된 중간 통계 컬럼들은 제거하여 모델 입력 정리
      .drop("med_std", "med_mdd", "mean_sharp")
)

# 5) 나머지 결측: 자금흐름→0, 대표펀드유무→'N'
df_final = df_imputed.fillna({
    "NetCashFlow(펀드자금흐름)_1년": 0,
    "대표펀드유무": "N"
})

# 6) 검증: 모든 칼럼의 널이 0인지 확인
final_nulls = {
    c: df_final.filter(col(c).isNull()).count()
    for c in df_final.columns
}
print("처리 후 컬럼별 널 개수:", final_nulls)


In [0]:
display(df_final.head(10))

In [0]:
len(df_final.columns)

## 📊 최종 수정본의 설명 - 결측치 처리 전략

### 1. 샤프 비율(Sharpe Ratio): 그룹별 평균 → 글로벌 중앙값 이중 보완

#### ✅ 1차 처리: 대유형별 평균
- 대부분의 결측치는 **대유형별 평균값**으로 대체
- → 펀드 그룹의 고유한 **경향성(trend)** 을 최대한 반영

#### ✅ 2차 처리: 글로벌 중앙값(Global Median)
- 일부 소수(31개)는 샤프 비율 값이 **전무한 그룹**에 속함
- → 해당 그룹은 글로벌 중앙값으로 대체
- → 전체 분포의 **중심값을 보존**

#### ❗ 글로벌 평균(Global Mean)을 사용하지 않은 이유
- 샤프 비율은 고성능 펀드에 의해 **분포가 비대칭적으로 왜곡되기 쉬움**
- 평균은 이러한 **극단치(outlier)** 에 민감함
- 중앙값은 분포의 중심을 **더 안정적으로 반영**함

---

### 2. 리스크 지표: 표준편차(Standard Deviation) · 최대 낙폭(MDD)

#### ✅ 그룹별 중앙값(Median) 대체
- 리스크 지표는 본질적으로 **편차가 크고 극단치가 많음**
- → 평균보다 중앙값이 **분포의 중간값**을 정확히 포착
- 예: **주식형 vs 채권형** 등, 그룹별로 값의 스케일이 크게 다름
- → 그룹 기준 중앙값 사용 시 **유형 특성을 왜곡하지 않음**

---

### 3. 기타 컬럼 결측 처리

| 컬럼명           | 대체 값 | 처리 이유                                |
|------------------|----------|--------------------------------------------|
| 자금흐름         | `0`      | “정보 없음”으로 간주 → 변동 없음 처리        |
| 대표펀드유무     | `'N'`    | 결측은 “대표 아님”으로 해석 → 분류 무리 없음  |

---

### ✅ 요약 정리

- **샤프 비율**: `대유형별 평균 → 글로벌 중앙값` 이중 보완
- **리스크 지표(표준편차, MDD)**: 그룹별 중앙값으로 처리
- **자금흐름**: `0`으로 대체
- **대표펀드유무**: `'N'`으로 대체
- **중앙값 사용의 핵심 이유**: 분포 왜곡을 방지하고 대표성을 유지하기 위함


결측치 한거 안한거 표 만들기 (scatter), 이상치랑 어것 저것 확인

In [0]:
display(df_final.head(10))

In [0]:
display(df_final.tail(10))

In [0]:
df_final.count()

### 대표펀드 유무 컬럼의 널 값을 'N'으로 대체하는 또다른 방법
##### 추가지식!
#### 옵션 1: withColumn + when/otherwise -> 조건을 세밀하게 조정하고 싶을때 사용하자
```python
from pyspark.sql.functions import col, when, lit
# ------------------------------------------------------------------
# 옵션 : withColumn + when/otherwise
# ------------------------------------------------------------------
df = df.withColumn(
    '대표펀드유무',
    when(col('대표펀드유무').isNull(), lit('N'))
      .otherwise(col('대표펀드유무'))
)

# (선택) 처리 결과 확인
display(df.filter(col('대표펀드유무').isNull()).limit(5))
```

### 피처에 대한 설명
| 지표       | 의미       | 실제 활용       | 추천 알고리즘 활용       |
| -------- | -------- | ----------- | ---------------- |
| 유형평균유형성과 | 동종 평균 성과 | 비교/초과 성과 측정 | 초과수익 지표로 활용      |
| 펀드표준편차   | 수익률 변동성  | 위험 등급 산정    | 리스크 기반 필터링       |
| 수정샤프지수   | 위험대비 수익  | 성과 비교 핵심 지표 | 가중치 기반 종합점수      |
| MDD      | 최대 하락폭   | 안정성 판단 기준   | 군집화/스코어링         |
| 자금흐름     | 인기/유행 추정 | 트렌드 상품 구분   | 가중치 조정, 종료 위험 감지 |


### 피처에 대한 자세한 명세 및 활용방안
1. 유형평균유형성과_1년
  * 정의: 동일 유형(예: 주식형, 채권형, 혼합형 등)에 속한 펀드들의 1년 평균 수익률

    * 실제 활용 사례:

      펀드 비교 기준: 삼성자산운용이나 신한자산운용에서는 펀드 수익률이 같은 유형 평균보다 높은지를 투자설명서에 명시하여 성과를 평가.

      마케팅 전략: 유형 평균을 초과한 펀드는 ‘초과 수익 달성’ 펀드로 소개되어 마케팅 요소로 사용가능.

    * 데이터 과학 활용:

      파생 피처 생성: 펀드성과 - 유형평균 으로 만든 초과성과 지표는 랭킹, 클러스터링, 분류 모델의 입력 변수로 사용.

      이상치 감지: 유형 대비 지나치게 낮은 성과는 비정상 펀드 탐지에 사용.

2. 펀드표준편차(연환산)_1년
* 정의: 1년간 수익률의 변동성을 나타내는 지표 (높을수록 리스크 큼)

    * 실제 활용 사례:

      금융투자협회의 펀드공시 기준에서 변동성 리스크 등급화에 활용됨

      투자설명서에서 "이 펀드는 고위험·고수익형" 등으로 분류할 때 기준이 됩니다.

    * 데이터 과학 활용:

      리스크 기반 필터링: 투자 성향이 ‘보수형’인 고객에게는 표준편차가 낮은 펀드만 필터링

      펀드 스코어 계산: 위험 대비 성과 평가 시, 표준편차는 분모로 들어갑니다. (예: 샤프지수)

3. 펀드수정샤프(연환산)_1년
* 정의: 위험 대비 수익률을 나타내는 지표 (Sharpe Ratio 보정 버전)

    * 실제 활용 사례:

      펀드평가사 에프앤가이드에서는 투자 성과 평가 시 절대수익률보다 중요하게 여겨짐

      금융기관의 추천시스템에서 랭킹 필터 기준으로 사용됨 
      (샤프지수(+)일시 -> 높을수록 투자성과 성공적)
      (샤프지수(-)일시 -> 수정 샤프 지수가 음수라면 해당 펀드는 위험 조정 성과가 매우 좋지 않다)

    * 데이터 과학 활용:

      가중치 기반 랭킹: 펀드수정샤프 * 0.4 + MDD 점수 * 0.3 + 표준편차 점수 * 0.3 식으로 종합 점수 산출

      비교 시각화: 동일 유형 펀드 내에서 샤프지수를 기준으로 상위 10%를 하이라이팅

4. Maximum DrawDown_1년
* 정의: 1년 중 최대 낙폭 (최고점 대비 최저점 하락률) → 펀드가 얼마나 큰 손실을 경험했는지를 보여줌

    * 실제 활용 사례:

      국민은행, 미래에셋 등에서는 MDD가 낮은 안정형 펀드를 장기투자용으로 권장

      ETF 평가 시에도 매우 중요한 기준입니다 (예: 타이거 미국S&P500 ETF의 마케팅 포인트 중 하나)

    * 데이터 과학 활용:

      보수적 투자자 분류 기준: MDD < -10% 펀드는 보수형 고객 대상에서 제외

      리스크 기반 군집화: 리스크 유사 펀드끼리 클러스터링 (KMeans, DBSCAN 등)

5. NetCashFlow(펀드자금흐름)_1년
* 정의: 최근 1년간 순자금 유입/유출 금액 (총 유입 - 총 유출)

    * 실제 활용 사례:

      고객 선호도나 트렌드 판단 지표로 활용됨. 최근 자금 유입이 많은 펀드는 ‘인기 펀드’로 간주

      신규 가입 유도 마케팅에도 활용: "자금 유입 1위 펀드" 등
    * 인기도 기반 추천 시스템:  `if NetCashFlow > 0:
      추천가중치 += 0.2`
      펀드 수명 주기 판단: 자금 유출이 지속되면 종료 예정 펀드로 분류 가능

### 널 값을 해결한 테이블을 통해 이상치 제거 계획안
  1. IQR 즉 사분위 방식으로 이상치 탐지
  
  2. Zscore를 이용해 이상치 탐지
> 이후 1번과 2번을 비교하고 이상치 제거

#### IQR방식으로 이상치 탐지

#### Zscore 방식으로 이상치 탐지 

_가중치 조합에 대한 실제적 근거 -> 논문이나 음 국제 쵸준 뭐 이런거 긁어서 그럴듯하게 자료만들기_

### 실제 가중치 조합 예시
#####● 실무적 가중치 배분
  1. 변동성(연환산 표준편차): 70~80%
  2. 최대낙폭(MDD): 10~20%
  3. 샤프지수/IR 등 위험조정성과: 10~20%
  * 예시 공식:
  펀드위험점수=0.7×표준편차(연환산)+0.2×최대낙폭(MDD)+0.1×(1−샤프지수(연환산))펀드위험점수=0.7×표준편차(연환산)+0.2×최대낙폭(MDD)+0.1×(1−샤프지수(연환산)) -> '수정샤프지수'로 해석시에는 오류
##### 표준편차가 위험의 절대적 척도이므로 가장 큰 비중을 차지
##### **최대낙폭(MDD)**는 극단적 리스크(최악의 손실) 반영
##### 샤프지수는 위험 대비 성과가 낮을수록(1-샤프) 위험점수에 가산 -> 오류 why? 수정샤프지수는 음수 해석까지 필요로 함

##### ● 근거
  1. 규제기관, 협회, 글로벌 운용사 모두 “변동성”을 공식 위험등급 산정의 핵심으로 사용
  (EU KID, 국내 금융투자협회 표준, 글로벌 펀드설명서 등)
  2. 샤프·MDD 등은 투자자 설명, 비교, 맞춤형 추천에서 보조적으로 활용
  (실제 피델리티 등 글로벌 운용사 공식 펀드설명서 참고5)

이상치 하기 전에 랭킹을 만들어보자

In [0]:
df_final.printSchema()

In [0]:
import pandas as pd

In [0]:
from sklearn.preprocessing import MinMaxScaler

In [0]:
from pyspark.sql.functions import col, expr 
# expr은 문자열 형태의 sql식을 그대로 사용하기 위해 사용 즉, expr("sql 표현식")형태로 기술
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
# VectorAssembler -> 여러개의 숫자형 컬럼을 벡터 컬럼으로 변환 why? > ML알고리즘은 input으로 feature라는 벡터컬럼을 사용 

# MinMaxScaler -> 벡터 컬럼 내 각 값들을 0과 1사이의 값으로 정규화 
# 즉, scaled_i = x_i - min(x) / max(x) - min(x) 단 상대적 위치만을 보고 스케일링 하므로 의도된 방향성을 설정하는 것이 중요
from pyspark.ml import Pipeline
# 코드의 유지 보수 및 재사용성을 위해 파이프라인 사용. 즉, 여러단계를 순차적 연결해 실행위함
from pyspark.sql.window import Window
# sql의 윈도우 함수 사용하기 위함 ex) partitioned by 등등 
import pyspark.sql.functions as F


# 1. 숫자형 변환
for c in ['펀드성과정보_1년', '펀드수정샤프(연환산)_1년', 'NetCashFlow(펀드자금흐름)_1년', '순자산', '운용보수']:
    df_final = df_final.withColumn(c, col(c).cast('double'))
    # cast() -> 문자열로 읽힌 숫자 데이터를 spark sql 데이터 타입인 double로 변환


#### Question : 꼭 cast로 형변환이 필요한가요?
1. CSV나 JSON 등 외부 데이터를 로드할 때, 숫자형 컬럼이 기본적으로 문자열(string) 타입으로 읽힐 수 있다.

2. ML 파이프라인(VectorAssembler, MinMaxScaler 등)과 수치 계산(집계, 윈도우 함수 등)은 숫자형 컬럼에서만 정상 동작함.

> 따라서 .cast('double')로 미리 타입을 통일해 주지 않으면 이후 스케일링, 합·평균 계산, 모델 학습 단계에서 에러가 발생합니다.

_비율이랑 값이 다 스케일러 적용해도 되나 싸그리??_

#### MinMaxScaler 스케일링 사용시 주의사항
| 질문                       | 답변                                                                                   |
| ------------------------ | ------------------------------------------------------------------------------------ |
| 음수값을 절댓값 또는 부호 반전해야 하나요? | **네, 반드시 필요합니다.**                                                                    |
| 이유는 무엇인가요?               | **MinMaxScaler는 값이 클수록 점수가 커지므로**, 음수 그대로 쓰면 위험이 큰 데이터가 오히려 점수가 작게 나오는 역전 현상이 발생. |
| 추천 방식은?                  | `-1 * MDD` 또는 `.abs()` 처리 후 `MinMaxScaler`로 점수화가 필요함.                                  |

> [자세한 내용](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html)


#### 아래코드의 전체 workflow
1. 원본 데이터 로드

2. 바로 직전 코드에서 숫자형 컬럼을 캐스팅

3. (VectorAssembler -> MinMaxScaler) => Pipelien화 즉 자동화를 위한 하나의 묶음으로 만듬

4. 즉, 하나의 파이프라인을 정의한 함수를 이용해 랭킹점수를 산정하기위한 
  1. 정의단계: (assembler, scaler정의 => pipeline정의) 
  2. 학습단계 : (fit()으로 이전 단계에서 정의한 스케일러를 학습 즉, 내부적으로 저장)
  3. 변환단계 : (transform()으로 스케일링된 값을 추출하는 과정)
  > 이후 assemble과 scaler에서 지정한 컬럼에 값이 부과된 채로 생성됨.

#### 사전 지식 : fit() 그리고 transform()
1. fit() : 앞서 정의한 통계치 또는 모델 파라미터를 학습(필요한 정보 저장)후 출력으로 transformer객체 반환
> ex. MinMaxScaler.fit(data) -> input col의 min, max계산후 내부적으로 저장

2. transform() : fit()에서 학습된 정보를 이용해 transform수행 즉, 얻어진 새로운 데이터를 변환 후 dataframe을 반환
> ex. scaler_model.transform(data)는 각 값을 (x - min) / (max - min) 즉, MinMaxScaler의 공식으로 정규화; 다시말해 실제 계산을 하여 'df' 형태로 반환.

In [0]:
# 2. MinMaxScaler를 위한 벡터화 및 정규화하는 함수 scale_column() 정의

def scale_column(df, input_col, output_col, fill_value=0):
    # df : 정규화 할 원본 데이터프레임
    # input_col : 0~1사이로 스케일링할 원본 컬럼 이름(문자열타입)
    # output_col : 스케일링된 값을 저장할 컬럼의 이름지정(문자열타입)
    # fill_value : input_col 컬럼에 결측값이 있을 때 대체할 값(숫자타입) 기본값은 '0'

    assembler = VectorAssembler(inputCols=[input_col], outputCol=f"{output_col}_vec")
    # 벡터 컬럼 생성(ML알고리즘은 입력을 벡터형태로 받기에 벡터형태로 변환해야 함)
    # 결과는 {output_col}_vec이라는 새 벡터 컬럼으로 나옴.
    # 즉 학습없는 단순변환 정의

    scaler = MinMaxScaler(inputCol=f"{output_col}_vec", outputCol=f"{output_col}_scaled")
    # 벡터 컬럼을 0~1사이로 스케일링(압축)하고 이 값을 저장할 컬럼명 지정

    pipeline = Pipeline(stages=[assembler, scaler])
    # 위의 두 단계 assembler(객체)와 scaler(객체)를 한번에 두 단계를 연속실행하는 파이프라인객체를 만듦

    df_filled = df.na.fill({input_col: fill_value})
    # 결측값이 있는 경우 fill_value로 대체함 why? : 스케일러는 Null 값 존재시 에러 발생. 따라서 반드시 Null을 해결해야함.

    model = pipeline.fit(df_filled)
    df_scaled = model.transform(df_filled)
    # 실제 스케일링된값들을 생성한 컬럼에 부여해서 반환

    # 벡터화시킨 컬럼에서 스칼라 값 추출
    from pyspark.sql.functions import udf # user defined function -> sql을 사용하기위함 알리아스를 udf로 정의
    from pyspark.sql.types import DoubleType
    
    extract_first = udf(lambda v: float(v[0]) if v is not None else 0.0, DoubleType())
    # extract_first라는 함수 정의 입력값 v는 보통 벡터 타입, 즉 DenseVector([value])
    # 1. v가 None이 아닌 경우 → v[0]을 float형으로 추출
    # 2. v가 None인 경우 → 0.0으로 대체
    # 여기서 반환값은 DoubleType으로 명시.
    # 이 함수를 왜 사용하는가? : PySpark의 MinMaxScaler, VectorAssembler 결과는 벡터 형식(DenseVector)으로 리턴. 하지만 모델링이나 데이터 분석 시에는 일반 숫자값으로 다시 바꾸는 경우가 많다. 따라서 위와 같이 벡터값을 일반 숫자값으로 변환하는 함수를 정의.

    return df_scaled.withColumn(output_col, extract_first(col(f"{output_col}_scaled")))
    # 새 컬럼 추가, 첫 파라미터는 추가할 컬럼명, 두번째 파라미터는 앞서 정의한 컬럼(첫번째 파라미터) 에 들어갈 값으로, 벡터값을 일반 숫자값으로 변환한 값을 추가하는 함수.

    # 예를 들어, output_col_scaled 값이 DenseVector([0.25])라면 output_col에는 0.25가 저장.

    # <총 흐름을 정리>
    # 예로 input_col = "MDD_1년", output_col = "MDD점수"
    #
    # 1. "MDD_1년" 값을 정규화 → "MDD점수_scaled" (벡터형)
    # 2. "MDD점수_scaled"의 첫 번째 값 추출 → "MDD점수" (실수형)
    # 3. 결과적으로 모델 학습에 바로 사용할 수 있는 실수형 컬럼이 생성됨

df_final = scale_column(df_final, '펀드성과정보_1년', '수익률점수')
#df_final = scale_column(df_final, '펀드수정샤프(연환산)_1년', '샤프점수') 
#df_final = df_final.withColumn('샤프점수', expr('1 - `샤프점수`'))
df_final = scale_column(df_final, 'NetCashFlow(펀드자금흐름)_1년', '자금유입점수')
# max_fee = df_final.agg({'운용보수': 'max'}).collect()[0][0]
df_final = scale_column(df_final, '운용보수', '수수료점수')
# , fill_value=max_fee
df_final = df_final.withColumn('수수료점수', expr('1 - `수수료점수`'))

In [0]:
# 3. 최종점수 계산
w1, w2, w3= 0.5, 0.3, 0.2
df_final = df_final.withColumn(
    '최종점수',
    w1 * col('수익률점수') +
    w2 * col('자금유입점수') +
    w3 * col('수수료점수')
)

In [0]:
# 4. 랭킹 생성
windowSpec = Window.orderBy(col('최종점수').desc())
df_final = df_final.withColumn('추천랭킹', F.row_number().over(windowSpec))

In [0]:
display(df_final)

In [0]:
# 상위 20개 펀드만 보기
display(df_final[['추천랭킹', '펀드코드', '펀드명', '운용사명', '최종점수', '펀드성과정보_1년', '펀드수정샤프(연환산)_1년', 'NetCashFlow(펀드자금흐름)_1년', '운용보수']].head(20))

In [0]:
# 5. 컬럼명 한글화 및 출력
styled_df = df_final.select(
    col('추천랭킹').alias('랭킹'),
    col('펀드코드').alias('코드'),
    col('펀드명').alias('펀드명'),
    col('운용사명').alias('운용사'),
    col('최종점수').alias('최종점수'),
    col('펀드성과정보_1년').alias('1년 수익률(%)'),
    col('펀드수정샤프(연환산)_1년').alias('1년 샤프지수(연환산)'),
    col('NetCashFlow(펀드자금흐름)_1년').alias('1년 자금유입'),
    col('운용보수').alias('운용보수(%)')
)

styled_df.display(20, truncate=False)

### 리스크 컬럼 생성 - 올바른 해석_수정본

In [0]:
# from pyspark.sql.functions import col, min, max

# df_final.select(
#     min('MaximumDrawDown_1년').alias('min_변동성'),
#     max('MaximumDrawDown_1년').alias('max_변동성'),
#     _sum((col('MaximumDrawDown_1년') < 0).cast('int')).alias('num_negative'),
#     _sum((col('MaximumDrawDown_1년') == 0).cast('int')).alias('num_zero'),
#     _sum((col('MaximumDrawDown_1년') > 0).cast('int')).alias('num_positive')
# ).show()

In [0]:
# from pyspark.sql.functions import col, min, max

# df_final.select(
#     min('펀드표준편차(연환산)_1년').alias('min_표준편차'),
#     max('펀드표준편차(연환산)_1년').alias('max_표준편차'),
#     _sum((col('펀드표준편차(연환산)_1년') < 0).cast('int')).alias('num_negative'),
#     _sum((col('펀드표준편차(연환산)_1년') == 0).cast('int')).alias('num_zero'),
#     _sum((col('펀드표준편차(연환산)_1년') > 0).cast('int')).alias('num_positive')
# ).show()

In [0]:
# from pyspark.sql.functions import col, min, max

# df_final.select(
#     min('펀드수정샤프(연환산)_1년').alias('min_수정샤프'),
#     max('펀드수정샤프(연환산)_1년').alias('max_수정샤프'),
#     _sum((col('펀드수정샤프(연환산)_1년') < 0).cast('int')).alias('num_negative'),
#     _sum((col('펀드수정샤프(연환산)_1년') == 0).cast('int')).alias('num_zero'),
#     _sum((col('펀드수정샤프(연환산)_1년') > 0).cast('int')).alias('num_positive')
# ).show()

In [0]:
# from pyspark.ml import Pipeline
# from pyspark.ml.feature import VectorAssembler, MinMaxScaler
# from pyspark.sql.functions import col, abs
# #리스크 피처들의 올바른 해석을 통한 리스크 점수 계산 - 수정본

# # 1) 사전 변환: 위험도 방향 맞추기
# #    - 변동성      : 값이 클수록 위험 ↑ (그대로)
# #    - 최대손실폭  : 원본이 음수이므로 절댓값(abs) 또는 부호 반전(-1) → 양수로
# #    - 샤프지수    : 값이 클수록 성과 우수 → “위험” 관점으로는 반전(-1)

# df_risk = df_final \
#     .withColumn('변동성_리스크',        col('펀드표준편차(연환산)_1년')) \
#     .withColumn('MDD_리스크',         abs(col('MaximumDrawDown_1년'))) \
#     .withColumn('샤프_리스크',        -col('펀드수정샤프(연환산)_1년'))

# # 2) 스케일링 함수 (벡터화 → MinMax → 단일 컬럼 추출)
# def scale_column(df, input_col, output_col, fill_value=0):
#     assembler = VectorAssembler(inputCols=[input_col], outputCol=f"{output_col}_vec")
#     scaler    = MinMaxScaler(inputCol=f"{output_col}_vec",    outputCol=f"{output_col}_scaled")
#     pipeline  = Pipeline(stages=[assembler, scaler])

#     df_filled = df.na.fill({input_col: fill_value})
#     model     = pipeline.fit(df_filled)
#     df_scaled = model.transform(df_filled)

#     # 스케일된 스칼라 값만 꺼내기
#     from pyspark.sql.functions import udf
#     from pyspark.sql.types import DoubleType
#     extract_first = udf(lambda v: float(v[0]) if v is not None else 0.0, DoubleType())

#     return df_scaled.withColumn(output_col, extract_first(col(f"{output_col}_scaled")))

# # 3) 각 리스크 지표 스케일링
# df_scaled = scale_column(df_risk, '변동성_리스크', '변동성점수')
# df_scaled = scale_column(df_scaled, 'MDD_리스크',    '최대손실폭점수')
# df_scaled = scale_column(df_scaled, '샤프_리스크',   '샤프위험점수')

# # 4) 최종 리스크 점수 결합
# w1, w2, w3 = 0.7, 0.2, 0.1
# df_final = df_scaled.withColumn(
#     '최종리스크점수',
#     w1 * col('변동성점수') +
#     w2 * col('최대손실폭점수') +
#     w3 * col('샤프위험점수')
# )

In [0]:
# # 5. 컬럼명 한글화 및 출력
# styled_df = df_final.select(
#     col('추천랭킹').alias('랭킹'),
#     col('최종리스크점수').alias('최종리스크점수'),
#     col('펀드코드').alias('코드'),
#     col('펀드명').alias('펀드명'),
#     col('운용사명').alias('운용사'),
#     col('최종점수').alias('최종점수'),
#     col('펀드성과정보_1년').alias('1년 수익률(%)'),
#     col('펀드수정샤프(연환산)_1년').alias('1년 샤프지수(연환산)'),
#     col('NetCashFlow(펀드자금흐름)_1년').alias('1년 자금유입'),
#     col('운용보수').alias('운용보수(%)')
# )

# styled_df.display(20, truncate=False)

In [0]:
# display(df_final.tail(20))

In [0]:
# # 1) 모든 컬럼 이름 리스트
# all_cols = df_final.columns

# # 2) 각 컬럼별 null 개수 집계식 생성
# null_count_exprs = [
#     _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
#     for c in all_cols
# ]

# # 3) 집계 실행: 한 행(Row)로 결과가 나옴
# null_counts_row = df_final.select(*null_count_exprs).first().asDict()

# # 4) 컬럼별 null 개수 출력
# print("컬럼별 null 개수:")
# for c, cnt in null_counts_row.items():
#     print(f"  {c}: {cnt}")

# # 5) null을 하나라도 가진 컬럼 개수 계산
# num_cols_with_null = builtins.sum(1 for cnt in null_counts_row.values() if cnt > 0)
# print(f"\nnull을 하나라도 가진 컬럼 개수: {num_cols_with_null}개")

_투자전략은 뭐 그냥 상품설명으로 걸어둔다 치고_

In [0]:
display(df_final.groupBy('투자전략').count())

_펀드키워드는 필터링 용으로 필요할 수 있겠다는 생각이 드네요_

In [0]:
display(df_final.groupBy('펀드키워드').count())

In [0]:
from pyspark.sql.functions import split, explode, trim, col

# 1) 쉼표로 분리 → explode로 행으로 펼침 → trim으로 공백 제거
keywords_df = (
    df_final.select(explode(split(col("펀드키워드"), ",")).alias("keyword"))
      .withColumn("keyword", trim(col("keyword")))
)

# 2) 키워드별 개수 집계
keyword_counts = (
    keywords_df.groupBy("keyword")
    .count()
    .orderBy(col("count").desc())
)

# 3) 결과 확인
display(keyword_counts)

In [0]:
keyword_with_bank = keyword_counts.filter(col("keyword").contains("은행"))
keyword_with_bank.show(truncate=False)

In [0]:
a = df_final.columns

In [0]:
a

In [0]:

# condition = col("keyword") == a[0]
# for word in a[1:]:
#     condition = condition | (col("keyword") == word)
# condition = ~condition

# filtered_keywords = keyword_counts.filter(condition)
# display(filtered_keywords)

In [0]:
# 이상치 탐지를 위한 수치형 컬럼들만 추출

# from pyspark.sql.types import NumericType

# # 1) 스키마에서 NumericType 인스턴스만 필터링
# numeric_cols = [
#     field.name
#     for field in df_final.schema.fields
#     if isinstance(field.dataType, NumericType)
# ]

# # 2) 숫자형 컬럼들만 선택
# df_numeric = df_final.select(numeric_cols)

# # 결과 확인
# print("수치형 컬럼들:", numeric_cols)
# display(df_numeric)
# len(df_numeric.columns)


## 이런 저런 처리를 위해 형을 되도록 실수형 변환


일단 남은 결측치를 확인

In [0]:
df_final = df_final.filter(col("순자산").isNotNull())
#7개 행 삭제

In [0]:
from pyspark.sql.functions import col, sum, when

null_counts = df_final.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_final.columns
])

display(null_counts)
# 결측치가 있는 컬럼 :  투자전략, 펀드키워드, 유형평균성과_1년

본격적으로 형 변환

In [0]:
# cols = [['대유형',
#  '펀드코드',
#  '펀드명',
#  '대표펀드유무',
#  '운용사명',
#  '설정일',
#  '중유형',
#  '소유형',
#  '투자전략',
#  '펀드키워드',
#  '펀드성과정보_1년',
#  '유형평균유형성과_1년',
#  '펀드표준편차(연환산)_1년',
#  '펀드수정샤프(연환산)_1년',
#  'MaximumDrawDown_1년',
#  'NetCashFlow(펀드자금흐름)_1년',
#  '순자산',
#  '운용보수',
#  '판매보수',
#  '선취수수료',
#  '후취수수료',
#  '가치주',
#  '성장주',
#  '중소형주',
#  '글로벌',
#  '자산배분',
#  '4차산업',
#  'ESG(사회책임투자형)',
#  '배당주',
#  'FoFs',
#  '퇴직연금',
#  '고난도금융상품',
#  '절대수익추구',
#  '레버리지',
#  '퀀트']]

# # 선택
# df_final_selected = df.select(*cols)

In [0]:
target_cols = [
    '펀드성과정보_1년',
    '펀드표준편차(연환산)_1년',
    '펀드수정샤프(연환산)_1년',
    'MaximumDrawDown_1년',
    'NetCashFlow(펀드자금흐름)_1년',
    '운용보수'
]

In [0]:
df_final.printSchema()

순자산은 숫자형태의 string이라 cast만 실수형변환

In [0]:
from pyspark.sql.functions import when, col
from pyspark.sql.types import FloatType

mcols = ['순자산']
for c in mcols:
    df_final = df_final.withColumn(
        c,
        col(c).cast(FloatType())
    )

아래 항목들은 Y/N 형식으로 작성된 string 컬럼들이라 1,0으로 변경 

In [0]:
yn_cols = ['대표펀드유무','가치주', '성장주', '중소형주', '글로벌', '자산배분', '4차산업', 'ESG(사회책임투자형)', '배당주', 'FoFs', '퇴직연금', '고난도금융상품', '절대수익추구', '레버리지', '퀀트']
for c in yn_cols:
    df_final = df_final.withColumn(
        c,
        when(col(c) == 'Y', 1.0)
        .when(col(c) == 'N', 0.0)
        .otherwise(None)
        .cast(FloatType())
    )


아래컬럼의 경우, 이상치 탐지에 중요한 컬럼이라고 생각이 되나 string이라 범주화를 시켜주고 이 값에 대해 넘버를 붙여주는 형식으로 실수화 시켰습니다.

In [0]:
from pyspark.ml.feature import StringIndexer

cat_cols = ['운용사명', '대유형', '중유형', '소유형']
indexed_cols = [c + '_idx' for c in cat_cols]

for orig, idx in zip(cat_cols, indexed_cols):
    indexer = StringIndexer(inputCol=orig, outputCol=idx, handleInvalid="keep")
    df_final = indexer.fit(df_final).transform(df_final)

# 결과 확인
df_final.select(cat_cols + indexed_cols).show(5)

In [0]:
display(df_final['운용사명', '대유형', '중유형', '소유형','운용사명_idx', '대유형_idx', '중유형_idx', '소유형_idx'])

In [0]:
display(df_final)

### 상관계수 구해보기

In [0]:
all_cols = df_final.columns

In [0]:
all_cols

In [0]:
numeric_types = ('IntegerType', 'DoubleType', 'FloatType', 'LongType', 'ShortType')
numeric_cols = [f.name for f in df_final.schema.fields if f.dataType.typeName() in [t.replace('Type', '').lower() for t in numeric_types]]

# 2. 분석 대상 컬럼만 남김
target_cols = [c for c in target_cols if c in numeric_cols]
all_cols = [c for c in all_cols if c in numeric_cols]
print(target_cols)
print(all_cols)

In [0]:
# from pyspark.ml.stat import Correlation
# from pyspark.ml.feature import VectorAssembler

# # 1. 널 제거
# df_clean = df_encoded.select(all_cols).dropna()

# # 2. 벡터화
# assembler = VectorAssembler(inputCols=all_cols, outputCol="features")
# df_vector = assembler.transform(df_clean)

# # 3. 상관행렬 계산 (전체 컬럼에 대해)
# corr_matrix = Correlation.corr(df_vector, "features", "pearson").head()[0].toArray()


# for target in target_cols:
#     i = all_cols.index(target)
#     print(f"\n📌 [{target}] 기준 상관관계:")
#     for j, other in enumerate(all_cols):
#         if i != j:
#             print(f"{other}: {corr_matrix[i][j]:.4f}")


## 이상치 처리 하기

target cols에 대해 삭제/유지/대체로 나누기 위해 이상치를 시각화 해보겠습니다

In [0]:
display(df_final)

## 사분위수IQR에 대한 설명
###사분위수(Quantile)란?
* _사분위수는 데이터를 네 구간(quarters)으로 나누는 경계 값_.
  1. Q1 (제1사분위수, 25%): 하위 25%의 값이 이 값보다 작음.

  2. Q2 (제2사분위수, 50%, 즉 중앙값): 데이터의 중간값.
  3. Q3 (제3사분위수, 75%): 하위 75%의 값이 이 값보다 작음.

* IQR의 정의
  1. IQR = Q3 - Q1 -> 데이터의 중간값인 '50% 범위'의미 => '일반적 분포'에서 벗어난 값의 식별 기준이 됨.
  
  2. 이 방식에서는 Low/Upper bound 범위 밖에 있는 값을 Outlier로 간주.
      - 예로 대부분 값이 10(lower)~20(upper)사이인데 50이란 값이 나오면 이는 iqr범위 밖, 이상치로 인식.
  3. 이때 1.5배를 기준으로 하는 이유는 범용적으로 잘 포착해주기 떄문.


In [0]:
from pyspark.sql.functions import col, count, when
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType

# 1. 수치형 컬럼만 필터링
numeric_cols = [field.name for field in df_final.schema.fields if isinstance(field.dataType, NumericType)]
# -> why? : IQR방식은 수치형 데이터에만 의미가 있음

# 2. 각 수치형 컬럼에 대해 이상치 판단 기준 (IQR) 계산
outlier_exprs = []
total_rows = df_final.count()

for col_name in numeric_cols:
    # 각 컬럼마다 IQR 적용
    # approxQuantile(col_name, [0.25, 0.75], 0.05)의 의미는 0.25, 0.75위치에서 q1과 q3의 값을 근사적으로 추출함 => 결과를 리스트 형식으로 반환, 0.05는 허용오차범위 의미.
    quantiles = df_final.approxQuantile(col_name, [0.25, 0.75], 0.05)
    if len(quantiles) < 2:
        continue  # 사분위 계산 안 되면 스킵

    q1, q3 = quantiles
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    # 이상치 조건식 추가
    outlier_exprs.append(
        F.sum(when((col(col_name) < lower_bound) | (col(col_name) > upper_bound), 1).otherwise(0)).alias(col_name)
    )
# -> IQR 방법은 데이터 분포의 중앙 50%를 기준으로 크게 벗어난 값(극단값)을 찾는데 범용적으로 사용

# 3. 이상치 개수 집계
outlier_counts_row = df_final.agg(*outlier_exprs).collect()[0].asDict()
# -> outlier_exprs 객체를 사용하여 각 컬럼별 이상치 개수를 집계하고, 이를 '컬럼 : 값'딕셔너리로 변환. => 모든 컬럼에 대해 한 번에 이상치 개수를 구해 비교가능.

# 4. 이상치 비율 계산
outlier_ratios = {
    col: round((count / total_rows) * 100, 2) for col, count in outlier_counts_row.items()
}

# 5. Pandas로 정리
import pandas as pd

pd_outliers = pd.DataFrame(list(outlier_ratios.items()), columns=['컬럼명', 'IQR이상치비율(%)'])
pd_outliers = pd_outliers.sort_values(by='IQR이상치비율(%)', ascending=False)
# -> 내림차순으로 정렬

display(pd_outliers)


## IQR 방법의 특징
  1. 비모수적 방법: 데이터 분포에 특정 가정이 필요없고, 극단값 영향에 덜 민감함.

  2. 박스플롯(상자그림)에서 사용: 시각적으로도 많이 활용됨.

  3. 실사용 : 대량의 로그, 금융, 센서 데이터 등 다양한 영역에서 통계적 기준으로 널리 쓰임.
  > 즉, IQR 방식은 데이터의 전형적(중간) 분포를 기준으로, 너무 크거나 너무 작은 값(극단값)을 식별하는 데 최적화된 방법임.

In [0]:
from pyspark.sql.functions import col

# 1. mdd가 -90보다 작은 행의 개수 세기
count_mdd_less_than_90 = df_final.filter(col("MaximumDrawDown_1년") < -90).count()

# 2. mdd가 -90보다 작은 행 추출
rows_mdd_less_than_90 = df_final.filter(col("MaximumDrawDown_1년") < -90)

# 결과 출력 (개수와 샘플 데이터)
print(f"Count of rows where MaximumDrawDown_1년 < -90: {count_mdd_less_than_90}")
df_final = df_final.filter(col("MaximumDrawDown_1년") >= -90)


이상치가 있는 것들 중 처리가 필요한 것들을 살펴보기


In [0]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

이진 변수에서 생기는 불균형에 의한 이상치
| 소수 클래스 비율(%)	| 불균형 정도	|실무적 처리 필요성 |
|-----------------------------|--------------|-----|
20~40%|Mild(경미)|	별도 처리 없이도 충분히 학습 가능|
1~20%	|Moderate(중간)	|불균형 처리 권장 (샘플링, 가중치, 임계값 조정 등)|
1% 미만	|Extreme(극단적)|	불균형 처리 필수 (SMOTE, 언더샘플링, 클래스 가중치 등)|
https://developers.google.com/machine-learning/crash-course/overfitting/imbalanced-datasets?hl=ko

| 컬럼명                      | 이상치비율(%) |  실무적 처리 필요성 |
|-----------------------------|--------------|--------------|
| FoFs                        | 28.57        | 그냥 유지 |
| 글로벌                      | 28.34        | 그냥 유지 |
| 대표펀드유무                | 19.55        | 그냥 유지 |
| 퇴직연금                    | 14.84        | 불균형 처리 |
| 자산배분                    | 8.37         | 불균형 처리 |
| 배당주                      | 5.92         | 불균형 처리 |
| 가치주                      | 4.94         | 불균형 처리 |
| 중소형주                    | 4.51         | 불균형 처리 |
| 고난도금융상품              | 3.76         | 불균형 처리 |
| ESG(사회책임투자형)         | 3.38         | 불균형 처리 |
| 성장주                      | 3.31         | 불균형 처리 |
| 4차산업                     | 2.36         | 불균형 처리 |
| 절대수익추구                | 1.14         | 불균형 처리 |
| 레버리지                    | 1.07         | 불균형 처리 |
| 퀀트                        | 0.7          | 삭제 |


| 컬럼명                      | 이상치비율(%) |
|---|---|
| 중유형_idx                  | 5.67         |
| 대유형_idx                  | 5.51         |
| 소유형_idx                  | 4.44         |
| 운용사명_idx                | 4.4          |

여러 점수를 뺀 컬럼들 
| 컬럼명                      | 이상치비율(%) | 음양여부 |
|-----------------------------|--------------|----|
| NetCashFlow(펀드자금흐름)_1년 | 24.76       | 음양
| 펀드수정샤프(연환산)_1년     | 19.67        | 음양
| 순자산                      | 18.51        | 양
| 선취수수료                  | 17.52        | 양
| 펀드성과정보_1년            | 12.92        | 음양
| 유형평균유형성과_1년        | 10.71        | 음양
| 후취수수료                  | 5.56         | 양
| MaximumDrawDown_1년         | 3.38         | 음
| 펀드표준편차(연환산)_1년     | 2.9          | 양
| 판매보수                    | 2.04         | 양
| 운용보수                    | 0.04         | 양


음수 양수에 따라 다른 이상치를 적용 해보려고 한다. 
- -only양수 -> 로그변환
- only음수 -> mdd밖에 없는데 이는 절댓값 씌워서 로그변환하는게 나을듯
- 양/음수 -> Yeo-Johnson 변환

In [0]:
df_final.printSchema

In [0]:
import pandas as pd

target_cols = ["NetCashFlow(펀드자금흐름)_1년", "순자산", "MaximumDrawDown_1년"]

def add_outlier_columns(df, target_cols, method='iqr', z_thresh=3.0):
    df = df.copy()
    for col in target_cols:
        if method == 'iqr':
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower = Q1 - 1.5 * IQR
            upper = Q3 + 1.5 * IQR
            df[f"{col}_outlier"] = ((df[col] < lower) | (df[col] > upper)).astype(int)
        elif method == 'zscore':
            mean = df[col].mean()
            std = df[col].std()
            df[f"{col}_outlier"] = (abs((df[col] - mean) / std) > z_thresh).astype(int)
        else:
            raise ValueError("지원하지 않는 method입니다: 'iqr' 또는 'zscore'만 허용")
    return df


In [0]:
pdf_sample = df_final.toPandas().copy()

In [0]:
target_cols = ["NetCashFlow(펀드자금흐름)_1년", "순자산", "MaximumDrawDown_1년"]
pdf_sample = add_outlier_columns(pdf_sample, target_cols, method='iqr')


In [0]:
import matplotlib.font_manager as fm

for font in fm.findSystemFonts():
    print(font)

In [0]:
font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"

# 2. FontProperties 객체 생성
font_prop = fm.FontProperties(fname=font_path)

In [0]:
sample_size = 10000
# Spark DataFrame → pandas DataFrame 변환 및 샘플링
pdf_sample = df_final.toPandas().sample(n=sample_size, random_state=42) if df_final.count() > sample_size else df_final.toPandas()

# 이상치 컬럼 생성
pdf_sample = add_outlier_columns(pdf_sample, target_cols, method='iqr')
fig, axes = plt.subplots(3, len(target_cols), figsize=(5*len(target_cols), 15))



for i, c in enumerate(target_cols):
    # 산점도
    sns.scatterplot(
        x=pdf_sample.index,
        y=pdf_sample[c].astype('float32'),
        hue=pdf_sample[f"{c}_outlier"],
        palette={0: 'blue', 1: 'red'},
        legend=False,
        ax=axes[0, i]
    )
    axes[0, i].set_title(f'산점도: {c}', fontproperties=font_prop)
    axes[0, i].set_xlabel('인덱스', fontproperties=font_prop)
    axes[0, i].set_ylabel(c, fontproperties=font_prop)

    # 박스플롯
    sns.boxplot(x=pdf_sample[c], color='lightgray', ax=axes[1, i])
    axes[1, i].set_title(f'박스플롯: {c}', fontproperties=font_prop)
    axes[1, i].set_xlabel(c, fontproperties=font_prop)

    # 히스토그램 (bin 수 제한)
    sns.histplot(pdf_sample[c], bins=50, kde=False, color='skyblue', ax=axes[2, i])
    axes[2, i].set_title(f'히스토그램: {c}', fontproperties=font_prop)
    axes[2, i].set_xlabel(c, fontproperties=font_prop)

plt.tight_layout()
plt.show()

In [0]:
import matplotlib.font_manager as fm

for font in fm.findSystemFonts():
    print(font)

In [0]:
from pyspark.sql.functions import col

netcashflow_zero_count = df_final.filter(col("NetCashFlow(펀드자금흐름)_1년") == 0).count()
순자산_zero_count = df_final.filter(col("순자산") == 0).count()
maxdrawdown_zero_count = df_final.filter(col("MaximumDrawDown_1년") == 0).count()

print(netcashflow_zero_count)
print(순자산_zero_count)
print(maxdrawdown_zero_count)


In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import scipy.stats as stats

# Yeo-Johnson Pandas UDF -> netcash
@pandas_udf(DoubleType())
def improved_yeo_johnson_udf(s):
    try:
        transformed, _ = stats.yeojohnson(s)
        return pd.Series(transformed)
    except Exception:
        return pd.Series([None] * len(s))

# Signed log1p Pandas UDF -> mdd
@pandas_udf(DoubleType())
def improved_signed_log_udf(s):
    try:
        return np.sign(s) * np.log1p(np.abs(s))
    except Exception:
        return pd.Series([None] * len(s))

# 순자산  
@pandas_udf(DoubleType())
def improved_log_udf(s: pd.Series) -> pd.Series:
    result = np.where(s > 0, np.log(s), 0)
    result = np.nan_to_num(result, nan=0.0, neginf=0.0, posinf=0.0)
    return pd.Series(result)


In [0]:
df_final = df_final.withColumn("NetCashFlow_YeoJohnson", improved_yeo_johnson_udf(col("NetCashFlow(펀드자금흐름)_1년")))
df_final = df_final.withColumn("MaximumDrawDown_log_signed", improved_signed_log_udf(col("MaximumDrawDown_1년")))
df_final = df_final.withColumn("순자산_log", improved_log_udf(col("순자산")))

In [0]:
display(df_final)

In [0]:
pdf_sample = df_final.toPandas().copy()

In [0]:
target_cols = ["NetCashFlow_YeoJohnson", "순자산_log", "MaximumDrawDown_log_signed"]
pdf_sample = add_outlier_columns(pdf_sample, target_cols, method='iqr')

In [0]:
sample_size = 10000
# Spark DataFrame → pandas DataFrame 변환 및 샘플링
pdf_sample = df_final.toPandas().sample(n=sample_size, random_state=42) if df_final.count() > sample_size else df_final.toPandas()

# 이상치 컬럼 생성
pdf_sample = add_outlier_columns(pdf_sample, target_cols, method='iqr')
fig, axes = plt.subplots(3, len(target_cols), figsize=(5*len(target_cols), 15))



for i, c in enumerate(target_cols):
    # 산점도
    sns.scatterplot(
        x=pdf_sample.index,
        y=pdf_sample[c].astype('float32'),
        hue=pdf_sample[f"{c}_outlier"],
        palette={0: 'blue', 1: 'red'},
        legend=False,
        ax=axes[0, i]
    )
    axes[0, i].set_title(f'산점도: {c}', fontproperties=font_prop)
    axes[0, i].set_xlabel('인덱스', fontproperties=font_prop)
    axes[0, i].set_ylabel(c, fontproperties=font_prop)

    # 박스플롯
    sns.boxplot(x=pdf_sample[c], color='lightgray', ax=axes[1, i])
    axes[1, i].set_title(f'박스플롯: {c}', fontproperties=font_prop)
    axes[1, i].set_xlabel(c, fontproperties=font_prop)

    # 히스토그램 (bin 수 제한)
    sns.histplot(pdf_sample[c], bins=50, kde=False, color='skyblue', ax=axes[2, i])
    axes[2, i].set_title(f'히스토그램: {c}', fontproperties=font_prop)
    axes[2, i].set_xlabel(c, fontproperties=font_prop)

plt.tight_layout()
plt.show()

In [0]:
from pyspark.sql.functions import col, count, when
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType

# 1. 수치형 컬럼만 필터링
numeric_cols = [field.name for field in df_final.schema.fields if isinstance(field.dataType, NumericType)]

# 2. 각 수치형 컬럼에 대해 이상치 판단 기준 (IQR) 계산
outlier_exprs = []
total_rows = df_final.count()

for col_name in numeric_cols:
    # IQR 구하기
    quantiles = df_final.approxQuantile(col_name, [0.25, 0.75], 0.05)
    if len(quantiles) < 2:
        continue  # 사분위 계산 안 되면 스킵

    q1, q3 = quantiles
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    # 이상치 조건식 추가
    outlier_exprs.append(
        F.sum(when((col(col_name) < lower_bound) | (col(col_name) > upper_bound), 1).otherwise(0)).alias(col_name)
    )

# 3. 이상치 개수 집계
outlier_counts_row = df_final.agg(*outlier_exprs).collect()[0].asDict()

# 4. 이상치 비율 계산
outlier_ratios = {
    col: round((count / total_rows) * 100, 2) for col, count in outlier_counts_row.items()
}

# 5. Pandas로 정리
import pandas as pd

pd_outliers = pd.DataFrame(list(outlier_ratios.items()), columns=['컬럼명', '이상치비율(%)'])
pd_outliers = pd_outliers.sort_values(by='이상치비율(%)', ascending=False)

display(pd_outliers)


### 결론 : 결측치 해결이 필요한 컬럼

In [0]:
df_final = df_final.withColumn("펀드수정샤프(연환산)_1년_YeoJohnson", improved_yeo_johnson_udf(col("펀드수정샤프(연환산)_1년")))
df_final = df_final.withColumn("펀드성과정보_1년_YeoJohnson", improved_yeo_johnson_udf(col("펀드성과정보_1년")))
df_final = df_final.withColumn("펀드표준편차(연환산)_1년_YeoJohnson", improved_yeo_johnson_udf(col("펀드표준편차(연환산)_1년")))
df_final = df_final.withColumn("판매보수_log", improved_log_udf(col("판매보수")))
df_final = df_final.withColumn("운용보수_log", improved_log_udf(col("운용보수")))
df_final = df_final.withColumn("선취수수료_log", improved_log_udf(col("선취수수료")))
df_final = df_final.withColumn("후취수수료_log", improved_log_udf(col("후취수수료")))

#널 없게 처리

In [0]:
display(df_final)

본격적으로 투자위험등급 0을 예측해서 채워보기

In [0]:
from pyspark.sql.functions import when, col

# '투자위험등급'이 0이면 null로 변환
df_final = df_final.withColumn(
    "투자위험등급",
    when(col("투자위험등급") == 0, None).otherwise(col("투자위험등급"))
)


In [0]:
display(df_final)

In [0]:
df_final.printSchema

In [0]:
#유형평균유형성과_1년 때문에 문제기 나는 것같아 일단 삭제
df_final = df_final.drop(col("유형평균유형성과_1년"))


In [0]:
feature_cols = [f.name for f in df_final.schema.fields if isinstance(f.dataType, NumericType)]

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# 결측치가 없는 데이터셋 분리
train_df = df_final.filter(df_final["투자위험등급"].isNotNull())
# 학습/평가 데이터 분리 (예: 80% 학습, 20% 평가)
train_data, val_data = train_df.randomSplit([0.8, 0.2], seed=42)

test_df = df_final.filter(df_final["투자위험등급"].isNull())

# 숫자형 피처 결측치 대체 (평균값)
numeric_cols = [col for col in feature_cols if col != "투자위험등급"]
imputer = Imputer(inputCols=numeric_cols, outputCols=[f"{c}_imputed" for c in numeric_cols])

# 특징 벡터화
assembler = VectorAssembler(
    inputCols=[f"{c}_imputed" for c in numeric_cols],
    outputCol="features"
)

In [0]:
# 모델 정의
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="투자위험등급",
    numTrees=50,
    maxDepth=10,
    seed=42
)

# 파이프라인 구성 및 학습
pipeline = Pipeline(stages=[imputer, assembler, rf])
model = pipeline.fit(train_data)

# 결측치 예측 및 채우기
rf_predicted = model.transform(val_data)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 정확도
accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol="투자위험등급",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = accuracy_evaluator.evaluate(rf_predicted)
print(f"Accuracy: {accuracy:.4f}")

# F1-score
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="투자위험등급",
    predictionCol="prediction",
    metricName="f1"
)
f1 = f1_evaluator.evaluate(rf_predicted)
print(f"F1 Score: {f1:.4f}")


In [0]:
from pyspark.ml.classification import LogisticRegression

# 파이프라인 재사용 (동일 imputer/assembler)
lr = LogisticRegression(
    featuresCol="features",
    labelCol="투자위험등급",
    family="multinomial"
)

pipeline = Pipeline(stages=[imputer, assembler, lr])
model = pipeline.fit(train_data)

predicted_lr = model.transform(val_data)


In [0]:
# 정확도
accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol="투자위험등급",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = accuracy_evaluator.evaluate(predicted_lr)
print(f"Accuracy: {accuracy:.4f}")

# F1-score
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="투자위험등급",
    predictionCol="prediction",
    metricName="f1"
)
f1 = f1_evaluator.evaluate(predicted_lr)
print(f"F1 Score: {f1:.4f}")

In [0]:
%pip install xgboost


In [0]:
# 1~6 → 0~5로 변환
df_final = df_final.withColumn(
    "투자위험등급_xgb",
    (col("투자위험등급") - 1).cast("int")
)

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# 결측치가 없는 데이터셋 분리
train_df = df_final.filter(df_final["투자위험등급"].isNotNull())
# 학습/평가 데이터 분리 (예: 80% 학습, 20% 평가)
train_data, val_data = train_df.randomSplit([0.8, 0.2], seed=42)
# train_data, val_data = train_df.randomSplit([0.8, 0.2])

test_df = df_final.filter(df_final["투자위험등급"].isNull())

# 숫자형 피처 결측치 대체 (평균값)
numeric_cols = [col for col in feature_cols if col != "투자위험등급"]
imputer = Imputer(inputCols=numeric_cols, outputCols=[f"{c}_imputed" for c in numeric_cols])

# 특징 벡터화
assembler = VectorAssembler(
    inputCols=[f"{c}_imputed" for c in numeric_cols],
    outputCol="features"
)

In [0]:
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.feature import VectorAssembler

# XGBoost 분류기
# 기존 파이프라인에서 label_col만 "투자위험등급_xgb"로 변경
xgb = SparkXGBClassifier(
    max_depth=5,
    features_col="features",
    label_col="투자위험등급_xgb",
    prediction_col="pred_투자위험등급",
    missing=float("nan"),
    num_workers=2,
    num_class=6 #(0~5)
)

# 학습
pipeline = Pipeline(stages=[imputer, assembler, xgb])
xgb_model = pipeline.fit(train_data)

# 예측
predicted_xg = xgb_model.transform(val_data)


In [0]:

predicted_xg = predicted_xg.withColumn(
    "prediction",
    (col("pred_투자위험등급") + 1).cast("double")
)

In [0]:
# 정확도
accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol="투자위험등급",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = accuracy_evaluator.evaluate(predicted_xg)
print(f"Accuracy: {accuracy:.4f}")

# F1-score
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="투자위험등급",
    predictionCol="prediction",
    metricName="f1"
)
f1 = f1_evaluator.evaluate(predicted_xg)
print(f"F1 Score: {f1:.4f}")

In [0]:
val_data.groupBy("투자위험등급_xgb").count().orderBy("count", ascending=False).show()

In [0]:
train_data.groupBy("투자위험등급_xgb").count().orderBy("count", ascending=False).show()

In [0]:
# 학습 완료 후
booster = xgb_model.stages[-1].get_booster()  # 파이프라인 마지막 스테이지가 XGBoost라면

# 중요도 딕셔너리 반환
importances = booster.get_score(importance_type='weight')  # 'gain', 'cover' 등도 가능

# 중요도 순으로 정렬
sorted_importances = sorted(importances.items(), key=lambda x: x[1], reverse=True)
for feat, score in sorted_importances:
    print(f"{feat}: {score}")

In [0]:
for i, col in enumerate(feature_cols):
    print(f"f{i}: {col}")

In [0]:
predicted_xg = xgb_model.transform(test_df)

In [0]:
predicted_xg = predicted_xg.drop("투자위험등급") 

In [0]:
display(predicted_xg)

In [0]:
from pyspark.sql.functions import col

predicted_xg = predicted_xg.withColumn(
    "투자위험등급",
    (col("pred_투자위험등급") + 1).cast("double")
)

In [0]:
predicted_xg = predicted_xg.select(train_df.columns)

In [0]:
filled_df = train_df.union(predicted_xg) 

In [0]:
df_final.count()

In [0]:
filled_df.count()

In [0]:
filled_df.groupBy("투자위험등급").count().orderBy("count", ascending=False).show()

In [0]:
filled_df.columns

### 문득 raw에 대해 의문이 들어 보다가 종합등급, 위험등급이라는 컬럼이 있다는 것을 '깨'달았다....

해봤는데 판매위험등급이 0(이상치 혹은 결측치)일때 ["MaximumDrawDown_1년""펀드수정샤프(연환산)_1년","펀드표준편차(연환산)_1년"]에서 이상치가 있는 행은 1500개,

투자위험등급이 0(이상치 혹은 결측치)일때 ["MaximumDrawDown_1년""펀드수정샤프(연환산)_1년","펀드표준편차(연환산)_1년"]에서 이상치가 있는 행은 212개였다.


둘의 개념적인 차이점은 투자위험등급은 "상품 자체의 위험", 판매위험등급은 "판매사가 투자자 보호 관점에서 산정한 위험"이기에 어느것을 사용해도 큰 위험은 없다고 느꼈다.

그래서 아예 위험도컬럼을 

In [0]:
from pyspark.sql.functions import col

# 유니크 값과 개수 집계
unique_counts = (
    df_selected.groupBy("투자위험등급")
    .count()
    .orderBy("투자위험등급")
)

unique_counts.show()

## 짠 데이터 구조대로 데이터프레임 정규화하기

## 1. 펀드 기본 정보 테이블 (`funds_info`)

| 제목 | 1열 | 2열 |
| --- | --- | --- |
| 컬럼명 | 타입 | 설명 |
| `펀드코드` | STRING (PK) | 펀드 고유 식별자 |
| `펀드명` | STRING | 펀드 이름 |
| `대표펀드유무` | BOOLEAN | 대표 펀드 여부 |
| `설정일` | DATE | 펀드 설정일 |
| `운용사명` | STRING (FK) | 운용사 테이블 참조 |
| `투자전략` | STRING | 텍스트 설명 |
| `펀드키워드` | STRING | 태그, 키워드 등 |

---

## 2. 유형 분류 테이블 (`fund_types`)

| 제목 | 1열 | 2열 |
| --- | --- | --- |
| 컬럼명 | 타입 | 설명 |
| 펀드코드 | STRING(FK) |  |
| `대유형` | STRING  |  |
| `중유형` | STRING  |  |
| `소유형` | STRING  |  |

> 중유형, 소유형은 각각 대유형, 중유형에 종속되므로 정규화 시 상위키를 FK로 포함시켜도 됨
> 

---

## 3. 성과 지표 테이블 (`fund_performance`)

| 제목 | 1열 | 2열 |
| --- | --- | --- |
| 컬럼명 | 타입 | 설명 |
| `펀드코드` | STRING (FK) |  |
| `펀드성과정보_1년` | DOUBLE |  |
| `펀드표준편차(연환산)_1년` | DOUBLE |  |
| `펀드수정샤프(연환산)_1년` | DOUBLE |  |
| `MaximumDrawDown_1년` | DOUBLE |  |
| `NetCashFlow(펀드자금흐름)_1년` | DOUBLE |  |
| `순자산` | FLOAT |  |

---

## 4. 랭킹 테이블 (`fund_rank`)

| 제목 | 1열 | 2열 |
| --- | --- | --- |
| 컬럼명 | 타입 | 설명 |
| `펀드코드` | STRING (FK) |  |
| `수익률점수` | DOUBLE |  |
| `자금유입점수` | DOUBLE |  |
| `수수료점수` | DOUBLE |  |
| `최종점수` | DOUBLE |  |
| `추천랭킹` | INT |  |

## 5. 수수료/비용 테이블 (`fund_fees`)

| 컬럼명 | 타입 |
| --- | --- |
| `펀드코드` | STRING (FK) |
| `운용보수` | DOUBLE |
| `판매보수` | DOUBLE |
| `선취수수료` | DOUBLE |
| `후취수수료` | DOUBLE |

---

## 6. 테마/성격 테이블 (`fund_tags`)

| 컬럼명 | 타입 | 설명 |
| --- | --- | --- |
| `펀드코드` | STRING (FK) |  |
| `가치주` | FLOAT |  |
| `성장주` | FLOAT |  |
| `중소형주` | FLOAT |  |
| `글로벌` | FLOAT |  |
| `자산배분` | FLOAT |  |
| `4차산업` | FLOAT |  |
| `ESG(사회책임투자형)` | FLOAT |  |
| `배당주` | FLOAT |  |
| `FoFs` | FLOAT |  |
| `퇴직연금` | FLOAT |  |
| `고난도금융상품` | FLOAT |  |
| `절대수익추구` | FLOAT |  |
| `레버리지` | FLOAT |  |
| `퀀트` | FLOAT |  |

---

## 7. 위험등급 테이블 (`fund_risk_grades`)

| 컬럼명 | 타입 |
| --- | --- |
| `펀드코드` | STRING (FK) |
| `투자위험등급` | DOUBLE |
| `판매위험등급` | DOUBLE |

In [0]:
# 컬럼명 매핑 정의
perf_column_map = {
    "펀드표준편차(연환산)_1년": "펀드표준편차연환산_1년",
    "펀드수정샤프(연환산)_1년": "펀드수정샤프연환산_1년",
    "NetCashFlow(펀드자금흐름)_1년": "NetCashFlow펀드자금흐름_1년",
    "ESG(사회책임투자형)":"ESG"
}

# 컬럼명 변경
for old_col, new_col in perf_column_map.items():
    filled_df = filled_df.withColumnRenamed(old_col, new_col)


In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS funds_data")


In [0]:
def save_table(df, table_name):
    full_table_name = f"funds_data.{table_name}"
    df.write.mode("overwrite").saveAsTable(full_table_name)
    print(f"Saved: {full_table_name}")


In [0]:
funds_df = filled_df.select(
    "펀드코드", "펀드명", "대표펀드유무", "설정일", "운용사명", "투자전략", "펀드키워드"
)

save_table(funds_df, "funds_info")


In [0]:
fund_types_df = filled_df.select(
    "펀드코드", "대유형", "중유형", "소유형"
)

save_table(fund_types_df, "fund_types")


In [0]:
fund_perf_df = filled_df.select(
    "펀드코드", 
    "펀드성과정보_1년", 
    "펀드표준편차연환산_1년", 
    "펀드수정샤프연환산_1년", 
    "MaximumDrawDown_1년", 
    "NetCashFlow펀드자금흐름_1년", 
    "순자산"
)

save_table(fund_perf_df, "fund_performance")


In [0]:
fund_rank_df = filled_df.select(
    "펀드코드", "수익률점수", "자금유입점수", "수수료점수", "최종점수", "추천랭킹"
)

save_table(fund_rank_df, "fund_rank")


In [0]:
fund_fees_df = filled_df.select(
    "펀드코드", "운용보수", "판매보수", "선취수수료", "후취수수료"
)

save_table(fund_fees_df, "fund_fees")


In [0]:
fund_tags_df = filled_df.select(
    "펀드코드", "가치주", "성장주", "중소형주", "글로벌", "자산배분", 
    "4차산업", "ESG", "배당주", "FoFs", "퇴직연금", 
    "고난도금융상품", "절대수익추구", "레버리지", "퀀트"
)

save_table(fund_tags_df, "fund_tags")


In [0]:
fund_risk_df = filled_df.select(
    "펀드코드", "투자위험등급", "판매위험등급"
)

save_table(fund_risk_df, "fund_risk_grades")
