## Overview
- 0. dim_date_ym                                        : dimension data (master) 
- 1. mac_addr  first_act_date                          : raw data
- 2. joinA : dim_date_ym  mac_addr   first_act_date
-                 2024.01     A           2024.01
-                 2024.02     A           2024.01
-                   ...
-                 2025.03     A           2024.01
- 3. mac_addr  date_ym  -> last(dpv)                   : pivot data
-         A      2024.01      8.0
-         A      2024.05      9.0
- 4. joinB : joinA + pivot_data
-     dim_date_ym  mac_addr  first_act_date  mac_addr  date_ym  last(dpv)   rn
-        2024.01     A          2024.01         A        2024.01     8.0      
-        2024.02     A          2024.01         A        2024.01     8.0       2 (조건2, orderBy mac_addr/dim_date/date_ym desc)
-         ...        A          2024.01         A        2024.01     8.0
-        2025.03     A          2024.01         A        2024.01     8.0
- 
-        2024.01     A          2024.01         A        2024.02     9.0       - (조건1, dim_date_ym < date_ym)
-        2024.02     A          2024.01         A        2024.02     9.0   
-         ...        A          2024.01         A        2024.02     9.0
-        2025.03     A          2024.01         A        2024.02     9.0
- 5. joinB 필터 적용
      - 조건1) dim_date_ym >= date_ym
      - 조건2) rn = 1, (order by dim_date_ym, date_ym desc) as rn
- 6. 중간 날짜 로그 없는 경우, 이전 날짜 값으로 치환 
- 7. joinC : joinB(mac_addr) + 컬럼 추가
      - 테이블1) use_mac_user_master : mac_addr, country_code, platform_code, sales_model_code 
      - 테이블2) 인치 정보
      - 하드코딩) platform_version
- 8. 검토 :
      - 조건1) inch가 잘 조인되었는지 : sales_model_code별 inch가 1개 조회되는지 (where platform_version  not like 'WEE%')
- 9. 집계 : count_ud
-     dim_date_ym  country_code  platform_version  platform_code  sales_model_code  inch  dpv  count_ud






## -1. 파라미터 입력

In [0]:
p_cn = 'FR' # country code
p_pv = str(dbutils.widgets.get("p_pv")) # platform version
print(p_pv, type(p_pv))

if p_pv == 'webOSTV 22' :
    p_normal_table = 'normal_log_webos22'
elif p_pv == 'webOSTV 23' :
    p_normal_table = 'normal_log_webos23'
elif p_pv == 'webOSTV 24' :
    p_normal_table = 'normal_log_webos24'

## 0. 날짜범위 구하기 : date_ym dimension 만들기 

In [0]:
%python
from pyspark.sql.types import StructType, StructField, StringType

# Set schema/data
schema = StructType([
    StructField('dim_date_ym', StringType(), True)
])
data = [('2024-10',), ('2024-11',), ('2024-12',), ('2025-01',), ('2025-02',), ('2025-03',)]

# Create DataFrame
df_dim = spark.createDataFrame(data, schema)
df_dim.display()

## 1. 타겟 mac 구하기 : KR mac_addr 별 min(activation_date) 구하기

In [0]:
# 1. 2,573,502
df_raw = spark.sql(f'''
    select   mac_addr, min(first_activation_date) as first_activation_date, date_format(min(first_activation_date), 'yyyy-MM') as first_act_date_ym
    from     eic_data_mart.master_tables.use_mac_user_master
    where    country_code = '{p_cn}'
    group by mac_addr
''')
df_raw.limit(10).display()

## 2. [joinA] 타겟 mac_addr별 조사할 date_ym 범위 구하기

In [0]:
# 2. joinA : dim_date_ym  device_id   first_act_date
from pyspark.sql.functions import col

df_join_a = df_raw.crossJoin(df_dim) # cross join
df_join_a = df_join_a.where("first_act_date_ym <= dim_date_ym")

#### 검증

In [0]:
df_join_a.where("mac_addr = '001145c0f035011b10b1f6238b97193d88949f48b450d8923ec81f5302d25881'").display()

## 3. [mac_dpv_1m] 타겟 mac_addr에 대한 월 dpv 값 구하기 
> device_id date_ym -> last(dpv) : pivot data

In [0]:
%python 
df_dpv = spark.sql(f'''
    select mac_addr, date_ym, X_Device_SDK_VERSION as dpv
    from   (
        SELECT mac_addr, date_ym, row_number() over(partition by mac_addr, date_ym order by log_create_time desc) as rn, X_Device_SDK_VERSION
        FROM   eic_data_ods.tlamp_private.{p_normal_table}
        WHERE  1=1
           AND X_device_country = '{p_cn}'
           AND context_name = 'tvpowerd'
           AND message_id = 'NL_POWER_STATE'
           AND date_ym >= '2024-10'
           AND date_ym <= '2025-03') t
    WHERE 1=1
      AND rn = 1
''')

#### 검증

In [0]:
df_dpv.count()

In [0]:
df_dpv.select('mac_addr', 'date_ym').distinct().count()

## 4. [joinB] joinA + mac_dpv_1m

In [0]:
df_join_b = df_join_a.join(df_dpv, on='mac_addr', how='left_outer')

In [0]:
df_join_b.orderBy('mac_addr', 'dim_date_ym', 'date_ym').limit(100).display()

## 5. [joinB] 필터

#### 조건 1

In [0]:
# 조건 1 
df_join_b1 = df_join_b.where("(date_ym is not null and dim_date_ym >= date_ym) or (date_ym is null)")

#### 조건 2

In [0]:
# 조건 2
from pyspark.sql.functions import row_number, when
from pyspark.sql.window import Window 

# null 여부를 flag로 지정 (null이면 1, 아니면 0)
df_join_b1 = df_join_b1.withColumn("null_flag", when(col("dpv").isNull(), 1).otherwise(0))

# 윈도우 정의
window_spec = Window.partitionBy("mac_addr", "dim_date_ym")\
                    .orderBy("null_flag", df_join_b1['date_ym'].desc())

# row_number 추가
df_join_b1 = df_join_b1.withColumn("rn", row_number().over(window_spec))

# rn = 1인 데이터만 필터링
df_join_b2 = df_join_b1.filter(df_join_b1["rn"] == 1)

#### 검증

In [0]:
# 검증
df_join_b2.orderBy('mac_addr', 'dim_date_ym', 'date_ym').limit(100).display()

In [0]:
from pyspark.sql.functions import count, max, min

# mac_addr별 count를 계산한 뒤, 그 중 최대값 추출
df_counts = df_join_b2.groupBy("mac_addr").agg(count("dim_date_ym").alias("cnt"))
df_counts.agg(max("cnt")).show()

from pyspark.sql.functions import count, max

# mac_addr별 count를 계산한 뒤, 그 중 최소값 추출
df_counts = df_join_b2.groupBy("mac_addr").agg(count("dim_date_ym").alias("cnt"))
df_counts.agg(min("cnt")).show()

## 6. 중간 날짜에 로그가 없어서 null인 경우, 이전 날짜의 값으로 채우기

In [0]:
from pyspark.sql.functions import to_date, last

# Window 정의 (device별 날짜 정렬)
w = Window.partitionBy("mac_addr").orderBy("dim_date_ym").rowsBetween(Window.unboundedPreceding, 0)

# 누적된 value_str (앞에서부터 채우기 - forward fill)
df_join_bf = df_join_b2.withColumn("filled_dpv", last("dpv", ignorenulls=True).over(w))
df_join_bf.where("dpv != filled_dpv").limit(10).display()


In [0]:
# 검증
df_join_bf.orderBy('mac_addr', 'dim_date_ym', 'date_ym').limit(100).display()

## 7. 컬럼 추가 조인

      - 테이블1) use_mac_user_master : mac_addr, country_code, platform_code, sales_model_code 
      - 테이블2) 인치 정보
      - 하드코딩) platform_version

In [0]:
df_dim_mac = spark.sql(f''' 
    select distinct mac_addr
           , country_code 
           , platform_code 
           , sales_model_code 
    from   eic_data_mart.master_tables.use_mac_user_master
    where  country_code = '{p_cn}'
''')
df_join_c = df_join_bf.join(df_dim_mac, on='mac_addr', how='left_outer')
df_join_c.limit(10).display()

In [0]:
df_tv_model = spark.sql(f'''
    -- 인치 정보
    select sales_model_code, last(inch) as inch
    from   eic_data_dimension.common_tv.tv_model
    where  use_yn = 'Y'
    group by sales_model_code
''')
df_join_c2 = df_join_c.join(df_tv_model, on='sales_model_code', how='left_outer')
df_join_c2.limit(10).display()


In [0]:
from pyspark.sql.functions import lit
df_join_cf = df_join_c2.withColumn("platform_version", lit(p_pv))

## 8. 저장 to delta

In [0]:
df_join_cf\
    .write.mode('overwrite')\
    .saveAsTable(f"sandbox.z_eunmi1_ko.temp_{p_cn}_{p_normal_table}")