# Import 문

In [38]:
import os
import pandas as pd
import re  # Python 정규 표현식 모듈 추가(Regular Expression)

from pyspark import SparkConf, SparkContext
from pyspark import StorageLevel  # 스토리지 레벨을 지정해 데이터의 저장 방식을 조정
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
from pyspark.sql.functions import (
    element_at, regexp_extract, explode, countDistinct, monotonically_increasing_id, broadcast, udf, col, isnull, count, when, substring, coalesce, 
    from_json, size, avg, expr, concat_ws, lpad, sum as Fsum
)
from pyspark.sql.types import ArrayType, StringType, DoubleType, IntegerType
from functools import reduce
from operator import or_
from datetime import datetime

In [2]:
# 재시도 로직 관련 전역 변수
MAX_RETRIES = 3
DELAY_BETWEEN_RETRIES = 3  # in seconds

# 데이터 가져오기

## Python 경로 지정 / Spark 세션 설정 및 생성

In [3]:
# 가상 환경의 Python 경로를 지정
python_path = "C:/Users/admin/anaconda3/envs/my_conda_01/python.exe"

# # JVM 힙 메모리 재설정
# conf = (SparkConf()
#          .set("spark.driver.extraJavaOptions", "-Xmx9g -Xms2g")
#          .set("spark.executor.extraJavaOptions", "-Xmx9g -Xms2g"))

# Spark 세션 생성
spark = SparkSession.builder \
    .appName("voice_metadata integrating operation") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.maxResultSize", "10g") \
    .config("spark.pyspark.python", python_path) \
    .config("spark.local.dir", "D:/spark_tmp") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .getOrCreate()

# SparkContext 가져오기
sc = spark.sparkContext

In [4]:
# Spark 중간 파일이 저장되는 경로 확인
current_spark_local_dir = spark.conf.get("spark.local.dir", "Not Set")
print("Spark local directory:", current_spark_local_dir)

Spark local directory: D:/spark_tmp


In [5]:
# spark.sparkContext.getConf().getAll()

## 함수 선언: csv 파일 -> Spark df로 로드

In [5]:
def load_csv_to_df_spark(directory, file_names):
    """해당 디렉토리로부터 지정된 형식의 CSV 파일들을 DataFrame으로 로드"""
    
    all_dfs = []
    for idx, file_name in enumerate(file_names):
        file_path = os.path.join(directory, file_name)
        current_df = spark.read.option("multiline", "true").csv(file_path, header=True, inferSchema=True)
        all_dfs.append(current_df)

    # reduce는 순차적인 데이터에 누적 연산, union은 병합 함수, 여기선 모든 df에 대해 순차적으로 병합하는 연산
    final_df = reduce(lambda x, y: x.union(y), all_dfs)
    return final_df

def load_csv_to_df_spark_skip_first_row(directory, file_names):
    """해당 디렉토리로부터 지정된 형식의 CSV 파일들을 DataFrame으로 로드하되 첫 번째 행을 제외함."""
    
    all_dfs = []
    for idx, file_name in enumerate(file_names):
        file_path = os.path.join(directory, file_name)

        # SparkContext를 사용하여 텍스트 파일 로드
        data = sc.textFile(file_path)

        # 첫 번째 행 제거
        first_row = data.first()
        data = data.filter(lambda row: row != first_row)

        # RDD의 텍스트 데이터를 DataFrame으로 변환
        temp_df = spark.read.csv(data, header=True, inferSchema=True)

        all_dfs.append(temp_df)

    # 모든 DataFrame을 병합
    final_df = reduce(lambda x, y: x.union(y), all_dfs)
    return final_df

## 함수 선언: parquet 파일 -> Spark df로 로드

In [6]:
def load_parquet_to_df_spark(directory, file_names):
    """해당 디렉토리로부터 지정된 형식의 Parquet 파일들을 DataFrame으로 로드"""

    all_dfs = []
    for idx, file_name in enumerate(file_names):
        file_path = os.path.join(directory, file_name)
        current_df = spark.read.parquet(file_path)
        all_dfs.append(current_df)

    final_df = reduce(lambda x, y: x.union(y), all_dfs)
    return final_df

## df_attend 로드

In [7]:
attend_directory = "D:/DATA_PREPROCESS/FIRESTORE_DATAS/Users_Attends/Users_Attend_231205/parquet_attend"
# attend_file_names = [f for f in os.listdir(attend_directory) if f.startswith('output-') and f.endswith('.csv')]
attend_file_names = [f for f in os.listdir(attend_directory) if f.startswith('output-') and f.endswith('.parquet')]

# # JSON 버전
# df_attend = load_csv_to_df_spark(attend_directory, attend_file_names)
# parquet 버전
df_attend = load_parquet_to_df_spark(attend_directory, attend_file_names)

In [9]:
df_attend.printSchema()

# df_attend.count()

root
 |-- user_id: string (nullable = true)
 |-- ArrayVoice: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- attend: double (nullable = true)
 |-- resultPass: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- addText: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ArrayPercent: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ArrayDate: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- type: boolean (nullable = true)
 |-- review_wish: string (nullable = true)
 |-- ArrayAddResult: array (nullable = true)
 |    |-- element: string (containsNull = true)



23-10-19-이전  
root  
 |-- user_id: string (nullable = true)  
 |-- ArrayVoice: string (nullable = true)  
 |-- attend: string (nullable = true)  
 |-- resultPass: string (nullable = true)  
 |-- imageUrl: string (nullable = true)  
 |-- addText: string (nullable = true)  
 |-- name: string (nullable = true)  
 |-- ArrayPercent: string (nullable = true)  
 |-- ArrayDate: string (nullable = true)  
 |-- title: string (nullable = true)   
 |-- type: string (nullable = true)  
 |-- review_wish: string (nullable = true)  
 |-- ArrayAddResult: string (nullable = true)  
   
23-10-19-이후( 이 형식이 더 원본을 잘 보존 )  
root  
|-- user_id: string (nullable = true)  
|-- ArrayVoice: array (nullable = true)  
|    |-- element: string (containsNull = true)  
|-- attend: double (nullable = true)  
|-- resultPass: string (nullable = true)  
|-- imageUrl: string (nullable = true)  
|-- addText: string (nullable = true)  
|-- name: string (nullable = true)  
|-- ArrayPercent: array (nullable = true)  
|    |-- element: string (containsNull = true)  
|-- ArrayDate: array (nullable = true)  
|    |-- element: string (containsNull = true)  
|-- title: string (nullable = true)  
|-- type: boolean (nullable = true)  
|-- review_wish: string (nullable = true)  
|-- ArrayAddResult: array (nullable = true)  
|    |-- element: string (containsNull = true)  
   
차이점  
- ArrayVoice: string -> ArrayVoice: array |-- element: string (containsNull = true)  
- attend: string -> attend: double  
- ArrayPercent: string -> ArrayPercent: array |-- element: string (containsNull = true)  
- ArrayDate: string -> ArrayDate: array |-- element: string (containsNull = true)  
- type: string -> type: boolean  
- ArrayAddResult: string -> ArrayAddResult: array |-- element: string (containsNull = true)  

In [8]:
df_attend.count()

15456421

In [11]:
df_attend.select('ArrayDate').show()

+-----------------------+
|              ArrayDate|
+-----------------------+
|[23.08.30 오후 07시 ...|
|[23.08.30 오후 08시 ...|
|[23.08.30 오후 08시 ...|
|[22.07.04 오전 00시 ...|
|[23.07.24 오후 05시 ...|
|[23.07.24 오후 05시 ...|
|[23.07.24 오후 05시 ...|
|[23.07.24 오후 05시 ...|
|[23.10.17 오후 03시 ...|
|[23.09.26 오전 09시 ...|
|[23.09.26 오전 09시 ...|
|[23.08.26 오후 05시 ...|
|[23.09.26 오전 09시 ...|
|[23.08.26 오후 05시 ...|
|[23.08.29 오후 01시 ...|
|[23.08.30 오후 10시 ...|
|[23.09.21 오후 03시 ...|
|[23.09.26 오전 09시 ...|
|[23.08.25 오후 01시 ...|
|[23.08.29 오후 01시 ...|
+-----------------------+
only showing top 20 rows



In [17]:
# 'ArrayDate' 배열의 각 요소를 별도의 행으로 변환
exploded_df = df_attend.select(explode(col("ArrayDate")).alias("date_string"))

# 날짜 데이터 추출을 위한 정규 표현식
date_regex = r'(\d{2}\.\d{2}\.\d{2})'

# 각 행에서 날짜 데이터를 추출
dates_df = exploded_df.select(regexp_extract(col("date_string"), date_regex, 0).alias("date"))

# '23.07.'로 시작하는 날짜 데이터 필터링
july_2023_df = dates_df.filter(col("date").startswith("23.07."))

# 필터링된 데이터의 개수 계산
july_2023_count = july_2023_df.count()

# 결과 출력
print(july_2023_count)

2429270


### df_attend의 중복 user_id, name 행들을 삭제한다

- 23-10-19 이전 기준 -
+--------------------------------+-----+ | name|count| +--------------------------------+-----+ | 이럴 때 스마투스 튼튼캔디를 ...| 3| | 특수 기술로 영양소 파괴를 최...| 1824| |팩의 민 케이크 유 오케이 하이...| 19| | 내가 최근에 잘못한 것은 내가...| 850| | 이럴 때 스마투스 튼튼캔디를 ...| 83| | 이럴 때 스마투스 튼튼캔디를 ...| 2| | pass| 1| +--------------------------------+-----+
  
Total number of unique names in dup_df_attend: 7  

총 671만개의 행 중에 2782개의 행이 문제가 발생하고 고치는 데 지나친 쿼리가 요구되기에,  
오류의 확산을 막기 위해 상대적 소수의 행들을 삭제한다. 

In [10]:
# # 'user_id'와 'name'으로 그룹화하고, 각 그룹 내에서 행 번호를 부여합니다.
# windowSpec = Window.partitionBy("user_id", "name").orderBy(F.desc("ArrayDate"))

# # 행 번호를 부여합니다.
# df_attend_with_rownum = df_attend.withColumn("row_num", F.row_number().over(windowSpec))

# # 행 번호가 1인 행만 선택합니다. 이렇게 하면 각 'user_id'와 'name' 그룹에서 최신 행만 선택됩니다.
# df_attend = df_attend_with_rownum.filter(F.col("row_num") == 1).drop("row_num")

# # 결과를 확인합니다.
# print(f"original count: 13687826, after deleting: {df_attend.count()}")

-23-10-19-이전-
original count: 6709020, after deleting: 6706238
- 이후 -
original count: 13687826, after deleting: 13687781

attend 데이터 중복 오류 획기적 감소(JSON -> CSV는 ','관련 문제 때문에 세심히 다루지 않으면 오류 발생) 

In [11]:
# # 데이터프레임이 캐시에서 제거됨
# df_attend_with_rownum.unpersist()

In [12]:
# # dup_df_attend를 'name' 기준으로 그룹화
# grouped_by_name = dup_df_attend.groupBy('name').agg(F.count('*').alias('count'))

# # 카운트 및 결과 출력
# name_count = grouped_by_name.count()
# grouped_by_name.show(truncate=False)

# # name_count는 'name'별로 그룹화된 레코드 수를 나타냅니다.
# print(f"Total number of unique names in dup_df_attend: {name_count}")

+--------------------------------+-----+
|                            name|count|
+--------------------------------+-----+
| 이럴 때 스마투스 튼튼캔디를 ...|    3|
| 특수 기술로 영양소 파괴를 최...| 1824|
|팩의 민 케이크 유 오케이 하이...|   19|
| 내가 최근에 잘못한 것은 내가...|  850|
| 이럴 때 스마투스 튼튼캔디를 ...|   83|
| 이럴 때 스마투스 튼튼캔디를 ...|    2|
|                            pass|    1|
+--------------------------------+-----+

Total number of unique names in dup_df_attend: 7

### df_attend 확인 코드

In [13]:
# pd.set_option('display.max_columns', None)  # 모든 열을 표시
# pd.set_option('display.width', None)        # 화면 너비에 맞게 출력
# print(df_attend.limit(5).toPandas())
# # df_attend.count()
# df_attend.columns

## 포인트벌기, 암기플러스, +유저 백업 데이터를 추가
- 백업 후 삭제된 데이터들을 뒤의 df_ad에 추가시키기 위함

In [8]:
def add_dfs(df1, df2):
    # 먼저, df1의 모든 행에 대해서 df2에서 같은 'name'을 가진 행이 있다면 그 값을 덮어씁니다.
    df1 = df1.alias('a').join(df2.alias('b'), 'name', 'left_outer').select(
        'a.name',
        *[F.coalesce('b.' + c, 'a.' + c).alias(c) for c in df1.columns if c != 'name']
    )
    return df1

In [15]:
# # 원래 DataFrame의 스키마를 가져옵니다.
# point_folder_schema = df_point_folder.schema
# memo_folder_schema = df_memo_folder.schema
# memo_backup_schema = df_memo_backup.schema
# point_backup_schema = df_point_backup.schema
# ad_schema = df_ad.schema

# # 빈 DataFrame을 생성합니다.
# df_point_folder = spark.createDataFrame([], point_folder_schema)
# df_memo_folder = spark.createDataFrame([], memo_folder_schema)
# df_memo_backup = spark.createDataFrame([], memo_backup_schema)
# df_point_backup = spark.createDataFrame([], point_backup_schema)
# df_ad = spark.createDataFrame([], ad_schema)

In [10]:
firebase_ads_point_directory = "D:/DATA_PREPROCESS/FIRESTORE_DATAS/FIREBASE_ads/backup/VOWING_PointRecord"

# 파일 경로를 인수로 전달하고 첫 번째 행을 건너뛰도록 적용
df_point_backup_f1 = load_csv_to_df_spark_skip_first_row(firebase_ads_point_directory, ['2023-06-15_12-45-19.csv'])
df_point_backup_f2 = load_csv_to_df_spark_skip_first_row(firebase_ads_point_directory, ['2023-06-19_12-05-10.csv'])
df_point_backup_f3 = load_csv_to_df_spark_skip_first_row(firebase_ads_point_directory, ['2023-07-21_00-05-31.csv'])

# # df_point_backup_f1에 df_point_backup_f2의 값을 덮어쓰기
# df_point_1_2 = add_dfs(df_point_backup_f1, df_point_backup_f2)

# # 위에서 덮어쓴 DataFrame에 df_point_backup_f3의 값을 덮어쓰기
# df_point_folder = add_dfs(df_point_1_2, df_point_backup_f3)

# # df_point_folder 데이터 타입 변경
# df_point_folder = df_point_folder.withColumn("player", df_point_folder["player"].cast("int")) \
#                                 .withColumn("linkClick", df_point_folder["linkClick"].cast("int")) \
#                                 .withColumn("linkWing", df_point_folder["linkWing"].cast("boolean"))

# # df_point_folder에서 linkClickDesc, script 컬럼 제거
# df_point_folder = df_point_folder.drop('linkClickDesc', "script")
# # reward 컬럼명을 script로 바꿈
# df_point_folder = df_point_folder.withColumnRenamed("reward", "script")

#--------------------------------------------------------------------------------------------------------------------------#
# 실행 오류를 막기 위해 수정한 코드 시도
# 먼저 데이터 타입을 변경하고, 그 다음에 데이터 프레임을 합침
# withColumnRenamed 메서드를 사용하기 전에 해당 컬럼이 존재하는지 확인

# 데이터 타입 변경
df_point_backup_f1 = df_point_backup_f1.withColumn("player", df_point_backup_f1["player"].cast("int")) \
                                       .withColumn("linkClick", df_point_backup_f1["linkClick"].cast("int")) \
                                       .withColumn("linkWing", df_point_backup_f1["linkWing"].cast("boolean"))
df_point_backup_f2 = df_point_backup_f2.withColumn("player", df_point_backup_f2["player"].cast("int")) \
                                       .withColumn("linkClick", df_point_backup_f2["linkClick"].cast("int")) \
                                       .withColumn("linkWing", df_point_backup_f2["linkWing"].cast("boolean"))
df_point_backup_f3 = df_point_backup_f3.withColumn("player", df_point_backup_f3["player"].cast("int")) \
                                       .withColumn("linkClick", df_point_backup_f3["linkClick"].cast("int")) \
                                       .withColumn("linkWing", df_point_backup_f3["linkWing"].cast("boolean"))

# 데이터 프레임 합치기
df_point_1_2 = add_dfs(df_point_backup_f1, df_point_backup_f2)
df_point_folder = add_dfs(df_point_1_2, df_point_backup_f3)

# 컬럼 제거 및 이름 변경
df_point_folder = df_point_folder.drop('linkClickDesc', "script")
if 'reward' in df_point_folder.columns:
    df_point_folder = df_point_folder.withColumnRenamed("reward", "script")


위의 셀은 실행이 굉장히 불안정함,  
첫 실행에선 오류가 지속적으로 발생   
두번째 실행하면 오류가 사라지는 경우가 대다수,   
원인 파악 필요  
-> Spark 자체 혹은 운영체제, 하드웨어 문제이므로 Databricks에서는 발생하지 않을 확률이 높음, 일단 그냥 재실행하는 수 밖에..

In [11]:
firebase_ads_memo_directory = "D:/DATA_PREPROCESS/FIRESTORE_DATAS/FIREBASE_ads/backup/VOWING_Memorization"

# 파일 경로를 인수로 전달하고 첫 번째 행을 건너뛰도록 적용
df_memo_backup_f1 = load_csv_to_df_spark_skip_first_row(firebase_ads_memo_directory, ['2023-06-26_00-05-14.csv'])
df_memo_backup_f2 = load_csv_to_df_spark_skip_first_row(firebase_ads_memo_directory, ['2023-07-10_12-05-15.csv'])
df_memo_backup_f3 = load_csv_to_df_spark_skip_first_row(firebase_ads_memo_directory, ['2023-07-27_12-05-23.csv']).drop('linkClickDesc')

# df_memo_backup의 데이터 타입 변경
df_memo_backup_f1 = df_memo_backup_f1.withColumn("gender", df_memo_backup_f1["gender"].cast("string"))
df_memo_backup_f3 = df_memo_backup_f3.withColumn("cnt", df_memo_backup_f3["cnt"].cast("int"))  \
                                .withColumn("point", df_memo_backup_f3["point"].cast("int")) \
                                .withColumn("linkClick", df_memo_backup_f3["linkClick"].cast("int")) \
                                .withColumn("percent", df_memo_backup_f3["percent"].cast("int")) \
                                .withColumn("type", df_memo_backup_f3["type"].cast("boolean"))

# print(df_memo_backup_f1.dtypes)
# print(df_memo_backup_f2.dtypes)
# print(df_memo_backup_f3.dtypes)

# df_memo_backup_f1에 df_memo_backup_f2의 값을 덮어쓰기
df_memo_1_2 = add_dfs(df_memo_backup_f1, df_memo_backup_f2)

# 위에서 덮어쓴 DataFrame에 df_memo_backup_f3의 값을 덮어쓰기
df_memo_folder = add_dfs(df_memo_1_2, df_memo_backup_f3)

df_memo_folder = df_memo_folder.drop('reward')

11월 13일 이후로 광고 스키마가 대량으로 변경된 건, 변경되기 이전 스키마와 이후 스키마를 모두 기록해서  
변경 이후를 변경 이전에 맞추는 작업을 수행해야 할 듯

In [12]:
firebase_ads_directory = "D:/DATA_PREPROCESS/FIRESTORE_DATAS/FIREBASE_ads/backup"

df_memo_backup = load_csv_to_df_spark(firebase_ads_directory, ['backup_2023-12-03_12-05-12_Memorization.csv'])
df_point_backup = load_csv_to_df_spark(firebase_ads_directory, ['backup_2023-12-03_12-05-14_Point.csv'])
df_user_backup = load_csv_to_df_spark(firebase_ads_directory, ['backup_2023-12-03_12-05-41_Users.csv'])

df_memo_backup = df_memo_backup.drop("doc_id", "linkClickDesc", "quiz")
df_point_backup = df_point_backup.drop("doc_id", "linkClickDesc", "quiz")

df_memo_backup = df_memo_backup.withColumn("linkClick", df_memo_backup["linkClick"].cast("int"))
df_point_backup = df_point_backup.withColumn("linkClick", df_point_backup["linkClick"].cast("int")) \
                                .withColumn("sell", df_point_backup["sell"].cast("int")) \
                                .withColumn("player", df_point_backup["player"].cast("int")) \
                                .withColumn("linkWing", df_point_backup["linkWing"].cast("boolean"))

# 기존의 script를 drop하고, reward 컬럼명을 script로 바꿈
df_point_backup = df_point_backup.drop("script")
df_point_backup = df_point_backup.withColumnRenamed("reward", "script")

df_memo_backup = df_memo_backup.drop('reward')

print(df_memo_backup.dtypes)
print(df_point_backup.dtypes)

[('imageUrl', 'string'), ('type', 'boolean'), ('collection', 'string'), ('videoUrl', 'string'), ('percent', 'int'), ('maintitle', 'string'), ('sell', 'int'), ('sold', 'int'), ('name', 'string'), ('ageArray', 'string'), ('imageArray', 'string'), ('player', 'int'), ('level', 'string'), ('descArray', 'string'), ('linkClick', 'int'), ('point', 'int'), ('cnt', 'int'), ('script', 'string'), ('dateEnd', 'string'), ('pay', 'int'), ('date', 'string'), ('engineArray', 'string'), ('linkWing', 'boolean'), ('link', 'string'), ('category', 'string'), ('title', 'string'), ('gender', 'string')]
[('cnt', 'string'), ('collection', 'string'), ('type', 'string'), ('point', 'string'), ('videoUrl', 'string'), ('imageUrl', 'string'), ('maintitle', 'string'), ('percent', 'string'), ('pay', 'string'), ('dateEnd', 'string'), ('sold', 'string'), ('sell', 'int'), ('name', 'string'), ('script', 'string'), ('date', 'string'), ('imageArray', 'string'), ('engineArray', 'string'), ('player', 'int'), ('ageArray', 'stri

11월 13일 백업파일 기준 dtypes  <- 이 데이터 타입에 12월 4일 데이터타입을 맞춰야 함  
[('imageUrl', 'string'), ('type', 'boolean'), ('collection', 'string'), ('videoUrl', 'string'), ('percent', 'int'), ('maintitle', 'string'), ('sell', 'int'), ('sold', 'int'), ('name', 'string'), ('ageArray', 'string'), ('imageArray', 'string'), ('player', 'int'), ('level', 'string'), ('descArray', 'string'), ('linkClick', 'int'), ('point', 'int'), ('cnt', 'int'), ('script', 'string'), ('dateEnd', 'string'), ('pay', 'int'), ('date', 'string'), ('engineArray', 'string'), ('linkWing', 'boolean'), ('link', 'string'), ('category', 'string'), ('title', 'string'), ('gender', 'string')]  
[('cnt', 'int'), ('collection', 'string'), ('type', 'boolean'), ('point', 'int'), ('videoUrl', 'string'), ('imageUrl', 'string'), ('maintitle', 'string'), ('percent', 'int'), ('pay', 'int'), ('dateEnd', 'string'), ('sold', 'int'), ('sell', 'int'), ('name', 'string'), ('script', 'string'), ('date', 'string'), ('imageArray', 'string'), ('engineArray', 'string'), ('player', 'int'), ('ageArray', 'string'), ('link', 'string'), ('category', 'string'), ('level', 'string'), ('descArray', 'string'), ('title', 'string'), ('linkClick', 'int'), ('linkWing', 'boolean'), ('gender', 'string')  ]

In [13]:
# 11월 13일 이후로 변경된 컬럼들의 데이터 타입을 원래의 이전 데이터 타입대로 변경
df_point_backup = df_point_backup \
    .withColumn("cnt", col("cnt").cast("int")) \
    .withColumn("type", col("type").cast("boolean")) \
    .withColumn("point", col("point").cast("int")) \
    .withColumn("percent", col("percent").cast("int")) \
    .withColumn("pay", col("pay").cast("int")) \
    .withColumn("sold", col("sold").cast("int"))

In [None]:
# # 데이터 중복, 개수 체크
# print(df_point_folder.dtypes)
# print(df_point_folder.count())
# print(df_memo_folder.dtypes)
# print(df_memo_folder.count())

# # name 컬럼에 대해 그룹화하고 카운트
# name_count_df = df_point_folder.groupBy("name").agg(F.count("name").alias("count"))

# # 카운트가 1보다 큰 행만 필터링하여 중복된 'name' 확인
# duplicates = name_count_df.filter(F.col("count") > 1)

# # 중복된 'name'이 있는지 확인하고 출력
# duplicates.show()

## df_ad 통합 및 로드

In [14]:
firestore_directory = "D:/DATA_PREPROCESS/FIRESTORE_DATAS"

df_memo = load_csv_to_df_spark(firestore_directory, ['fs_memo_20230803.csv'])
df_point = load_csv_to_df_spark(firestore_directory, ['fs_point_20230801.csv'])

# df_memo의 데이터 타입 변경
df_memo = df_memo.withColumn("sold", df_memo["sold"].cast("int"))  \
                .withColumn("sell", df_memo["sell"].cast("int"))

# df_point의 데이터 타입 변경
df_point = df_point.withColumn("linkClick", df_point["linkClick"].cast("int")) \
                   .withColumn("linkClickDesc", df_point["linkClickDesc"].cast("int")) \
                   .withColumn("linkWing", df_point["linkWing"].cast("boolean")) \
                   .withColumn("pay", df_point["pay"].cast("int")) \
                   .withColumn("percent", df_point["percent"].cast("int")) \
                   .withColumn("player", df_point["player"].cast("int")) \
                   .withColumn("sold", df_point["sold"].cast("int")) \
                   .withColumn("sell", df_point["sell"].cast("int")) \
                   .withColumn("point", df_point["point"].cast("int")) \
                   .withColumn("type", df_point["type"].cast("boolean"))

# 기존의 script를 drop하고, reward 컬럼명을 script로 바꿈
df_point = df_point.drop("script")
df_point = df_point.withColumnRenamed("reward", "script")

# point -> 'quiz' 컬럼 제거 // memo -> 'reward' 컬럼 제거
df_point_dropped = df_point.drop('quiz')
df_memo = df_memo.drop('reward')

# df_memo, df_point의 데이터프레임 합치기
df_ad = df_memo.union(df_point_dropped)

# id, linkClickDesc, reward 컬럼 제거
df_ad = df_ad.drop('id', 'linkClickDesc', 'reward')


# 모든 DataFrame의 열 이름을 하나의 집합에 모음
all_columns = set()
for dtypes in [df_point_folder.dtypes, df_memo_folder.dtypes, df_memo_backup.dtypes, df_point_backup.dtypes, df_ad.dtypes]:
    all_columns.update([col_name for col_name, _ in dtypes])
print(f"df_point_folder.dtypes: {df_point_folder.dtypes}")
print(f"df_memo_folder.dtypes: {df_memo_folder.dtypes}")
print(f"df_memo_backup.dtypes: {df_memo_backup.dtypes}")
print(f"df_point_backup.dtypes: {df_point_backup.dtypes}")
print(f"df_ad.dtypes: {df_ad.dtypes}")

# 각 DataFrame의 열을 동일한 순서로 정렬
def reorder_columns(df, all_columns):
    existing_columns = {col_name for col_name, _ in df.dtypes}
    missing_columns = all_columns - existing_columns
    reordered_columns = [col for col in all_columns if col in existing_columns] + list(missing_columns)
    return df.select(*reordered_columns)

df_memo_folder = reorder_columns(df_memo_folder, all_columns)
df_point_folder = reorder_columns(df_point_folder, all_columns)
df_memo_backup = reorder_columns(df_memo_backup, all_columns)
df_point_backup = reorder_columns(df_point_backup, all_columns)
df_ad = reorder_columns(df_ad, all_columns)

# df_ad, df_point_folder, df_memo_folder, df_point_backup, df_memo_backup 합치기 & 중복 행 제거
df_ad = df_ad.union(df_point_folder).union(df_memo_folder).union(df_point_backup).union(df_memo_backup)
print(df_ad.count())
df_ad = df_ad.distinct()
print(df_ad.count())

# "name" 컬럼에서 결측값을 가진 행을 제거
df_ad = df_ad.na.drop(subset=["name"])

df_point_folder.dtypes: [('name', 'string'), ('cnt', 'int'), ('collection', 'string'), ('type', 'boolean'), ('point', 'int'), ('videoUrl', 'string'), ('imageUrl', 'string'), ('maintitle', 'string'), ('percent', 'int'), ('pay', 'int'), ('dateEnd', 'string'), ('sold', 'int'), ('sell', 'int'), ('script', 'string'), ('date', 'string'), ('imageArray', 'string'), ('engineArray', 'string'), ('player', 'int'), ('ageArray', 'string'), ('link', 'string'), ('category', 'string'), ('level', 'string'), ('descArray', 'string'), ('title', 'string'), ('linkClick', 'int'), ('gender', 'string'), ('linkWing', 'boolean')]
df_memo_folder.dtypes: [('name', 'string'), ('imageUrl', 'string'), ('type', 'boolean'), ('collection', 'string'), ('videoUrl', 'string'), ('percent', 'int'), ('maintitle', 'string'), ('sell', 'int'), ('sold', 'int'), ('ageArray', 'string'), ('imageArray', 'string'), ('player', 'int'), ('level', 'string'), ('descArray', 'string'), ('linkClick', 'int'), ('point', 'int'), ('cnt', 'int'), (

## df_ad 확인 코드

In [None]:
# # 결과 확인

# print(f"df_memo 행 개수: {df_memo.count()} | df_point 행 개수: {df_point.count()}")
# print(f"df_memo_folder 행 개수: {df_memo_folder.count()} | df_point_folder 행 개수: {df_point_folder.count()}")
# print(f"df_memo_backup 행 개수: {df_memo_backup.count()} | df_point_backup 행 개수: {df_point_backup.count()}")
# print(f"df_ad 행 개수: {df_ad.count()} | df_ad 열 개수: {len(df_ad.columns)}")

### collection의 값이 잘못 입력된 암기플러스 행들을 수정

In [15]:
# # df_ad에서 title이 "암기플러스"인 행의 개수를 출력합니다.
# count_before = df_ad.filter(F.col("collection") == "Memorization").count()

# title이 "암기플러스"인 행의 "collection" 값을 "Memorization"으로 변경합니다.
df_ad = df_ad.withColumn("collection", F.when(F.col("title") == "암기플러스", "Memorization")\
             .otherwise(F.col("collection")))

# # 변경 후, title이 "암기플러스"인 행의 개수를 다시 계산합니다.
# count_after = df_ad.filter(F.col("collection") == "Memorization").count()
# print(f"Before change: {count_before}, After change: {count_after}")

In [None]:
# df_ad.select('script').show(30)

### df_ad의 기본적인 필드값이 포함되지 않은 행의 개수 파악과 수정

In [16]:
# 기본적인 필드 선택
essential_fields_of_ad = ['collection', 'date', 'dateEnd', 'level', 'name', 'percent']

# 기본적인 필드에 값이 있는 행의 개수 파악
non_null_count = df_ad.select(*essential_fields_of_ad).dropna().count()

# 하나라도 없는 행의 개수 파악
null_count = df_ad.count() - non_null_count

# 하나라도 값이 없는 행 제거
df_ad_cleaned = df_ad.dropna(subset=essential_fields_of_ad)

# 결과 출력 (또는 다른 작업 수행)
print(f"Rows with all selected fields present: {non_null_count}")
print(f"Rows with at least one missing field: {null_count}")

Rows with all selected fields present: 30009
Rows with at least one missing field: 24


df_ad의 필수 컬럼별 유무 행의 개수(231019 기준)

collection - all  
date - all  
dateEnd - all  
name - all  

( gender(광고에서 gender는 현재 가비지값)    
Rows with all selected fields present: 6512    
Rows with at least one missing field: 20589 ) <- 필수 아님  

level  
Rows with all selected fields present: 27088  
Rows with at least one missing field: 13  
  
percent  
Rows with all selected fields present: 27099  
Rows with at least one missing field: 2  

## Firebase > backup 안의 백업 파일들에 대한 확인 및 검증

In [None]:
# firebase_ads_directory = "D:/DATA_PREPROCESS/FIREBASE_ads/backup/VOWING_PointRecord"

# # 파일 경로를 인수로 전달하고 첫 번째 행을 건너뛰도록 적용
# df_memo_backup_f1 = load_csv_to_df_spark_skip_first_row(firebase_ads_directory, ['2023-06-20_00-05-20.csv'])
# df_memo_backup_f2 = load_csv_to_df_spark_skip_first_row(firebase_ads_directory, ['2023-06-19_12-05-10.csv'])

# # 모든 'name'이 포함되는지 확인
# names_1 = df_memo_backup_f1.select("name").distinct()
# names_2 = df_memo_backup_f2.select("name").distinct()

# # 두 DataFrame의 'name' 컬럼을 비교
# names_not_in = names_1.join(names_2, ["name"], "left_anti").count()

In [None]:
# # 사이에 일치하지 않는 'name'의 수 출력
# print(f"먼저 csv의 name 수: {names_1.count()}, 나중 csv의 name 수: {names_2.count()}")
# print(f"사이에 일치하지 않는 'name'의 수: {names_not_in}")

In [None]:
# # 사이에 일치하지 않는 'name' 출력
# names_not_in_060912_df = names_1.join(names_2, ["name"], "left_anti")
# names_not_in_060912_list = [row.name for row in names_not_in_060912_df.collect()]
# print(f"The unique 'name' values in df_memo_backup_060900 that are not in df_memo_backup_060912: {names_not_in_060912_list}")

## df_externals 로드

In [17]:
# df_ssd = load_csv_to_df_spark(firestore_directory, ['230517_d_95_extracted_ssd_d95_data_1.csv'])
df_hdd = load_csv_to_df_spark(firestore_directory, ['230517_d_80_gcs_extracted_hdd_d80_data_1.csv', '230517_d_80_gcs_extracted_hdd_d80_data_2.csv'])
# df_externals = df_ssd.union(df_hdd)
df_externals = df_hdd

In [None]:
# df_externals.printSchema()
# df_externals.count()

E:\\0707\\voicedata\\230517\\d_80\\gcs\\여름이 오기 전 날씬락토페린 300mg!\202304172018560BRBc3jsxpcGlVs98BpO1VSkp2b2_F90Se  
E:\\0707\\voicedata\\230517\\d_80\\gcs\\여름이 오기 전 날씬락토페린 300mg!\202304172025530BRBc3jsxpcGlVs98BpO1VSkp2b2_F90Se  
  
위와 같이 동일 user_id, ad_name인데 여러 개의 음성 파일이 저장된 경우가 있다. 이 경우 마지막 음성 파일의 메타데이터만 저장하도록 코드를 수정한다

In [18]:
# Window 함수 설정
windowSpec = Window.partitionBy("user_id", "ad_name").orderBy(F.desc("record_time"))

# row_number 추가
df_externals = df_externals.withColumn("row_number", F.row_number().over(windowSpec))

# row_number가 1인 행만 필터링
df_externals = df_externals.filter(F.col("row_number") == 1).drop("row_number")
# df_externals.count()

### df_externals 데이터 변환

In [19]:
# 'birth_year' 컬럼의 데이터타입을 문자열로 변환하고, lpad를 사용하여 한 자리 숫자 앞에 0을 붙입니다.
df_externals = df_externals.withColumn("birth_year", lpad(col("birth_year").cast("string"), 2, '0'))

### df_externals 확인 코드

In [21]:
df_externals.printSchema()
# print(df_externals.columns)
# df_externals.show(5)
# df_externals.count()

root
 |-- accuracy: integer (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- record_time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- is_test: integer (nullable = true)



In [55]:
# # 'birth_year' 컬럼의 고유한 값과 그 빈도수를 계산
# externals_birth_year_counts = df_externals.groupBy("birth_year").count()

# # 결과를 'count' 컬럼에 따라 내림차순으로 정렬
# externals_birth_year_counts_ordered = externals_birth_year_counts.orderBy(F.desc("count"))

# # DataFrame의 전체 행 수를 얻습니다.
# total_rows_of_externals_birth = externals_birth_year_counts_ordered.count()

# # 결과를 출력
# externals_birth_year_counts_ordered.show(total_rows_of_externals_birth, truncate=False)

+----------+-----+
|birth_year|count|
+----------+-----+
|82        |42686|
|81        |40608|
|83        |39939|
|79        |38530|
|80        |36181|
|92        |36131|
|88        |34991|
|95        |34965|
|86        |34081|
|91        |34043|
|90        |33938|
|84        |33696|
|93        |33292|
|87        |32248|
|89        |31032|
|85        |30303|
|94        |28962|
|96        |25034|
|78        |24243|
|76        |23667|
|00        |23197|
|74        |21812|
|98        |21680|
|72        |21184|
|97        |20635|
|77        |20629|
|71        |20133|
|73        |20121|
|70        |19181|
|69        |18304|
|75        |17529|
|68        |17097|
|99        |17095|
|66        |15284|
|01        |15278|
|65        |13456|
|02        |11850|
|67        |11708|
|64        |11289|
|62        |11157|
|63        |10574|
|03        |9829 |
|60        |9819 |
|04        |8830 |
|59        |8591 |
|08        |7914 |
|05        |7584 |
|06        |7376 |
|61        |6809 |
|07        |

#### 'birth_year'가 0인 행들에 관련해

In [None]:
# # 'birth_year'가 0인 행들을 필터링합니다.
# externals_birth_year_zero = df_externals.filter(F.col("birth_year") == 0)

# # 필터링된 데이터 프레임을 CSV 파일로 저장합니다.
# externals_birth_year_zero.write.csv(firestore_directory + "/birth_year_zero.csv", header=True)

## df_user 로드 및 통합

In [20]:
df_user = load_csv_to_df_spark(firestore_directory, ['users_data.csv'])

In [22]:
print(df_user.count())

38488


In [23]:
df_user.printSchema()

root
 |-- date: date (nullable = true)
 |-- birthday: integer (nullable = true)
 |-- mail: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- auth: integer (nullable = true)
 |-- NOTICE_TOPIC: boolean (nullable = true)
 |-- isSoundAllowed: boolean (nullable = true)
 |-- language: string (nullable = true)
 |-- PRIZES_TOPIC: boolean (nullable = true)
 |-- monthlyPerfects: double (nullable = true)
 |-- engine: string (nullable = true)
 |-- win: integer (nullable = true)
 |-- searchKeyWord: string (nullable = true)
 |-- isChallengeRequestAllowed: boolean (nullable = true)
 |-- image: string (nullable = true)
 |-- address: string (nullable = true)
 |-- REVIEW_TOPIC: boolean (nullable = true)
 |-- phoneId: string (nullable = true)
 |-- recommendCode: string (nullable = true)
 |-- realName: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- name: string (nullable = true)
 |-- detailAddress: string (nullable = true)
 |-- ANY

In [26]:
# # 기존의 user 조인 코드, 
# # 중복이 있기에 Union은 적절치 않다고 판단

# # 기존의 script를 drop하고, reward 컬럼명을 script로 바꿈
# df_user_backup = df_user_backup.drop("userID")
# df_user_backup = df_user_backup.withColumnRenamed("doc_id", "user_id")

# # 컬럼 순서를 동일하게 맞춤
# df_users = df_user.select(*df_user_backup.columns)

# # # 두 DataFrame을 합침
# # df_users = df_user.union(df_user_backup).distinct()

In [21]:
# 기존의 script를 drop하고, reward 컬럼명을 script로 바꿈
df_user_backup = df_user_backup.drop("userID")
df_user_backup = df_user_backup.withColumnRenamed("doc_id", "user_id")
df_user = df_user.drop("userID")

# 두 DataFrame을 user_id를 기준으로 외부 조인
df_combined = df_user.join(df_user_backup, "user_id", "outer")

# 겹치는 user_id의 경우 df_user_backup의 값을 사용
# 동일한 컬럼을 가지고 있으므로, df_user_backup의 컬럼을 사용하고 df_user의 컬럼은 제거
for column in df_user.columns:
    if column != "user_id":
        df_combined = df_combined.drop(df_user[column])

# 결과 DataFrame
df_users = df_combined.select(*df_user.columns)

231201 기존의 df_users -> df_user 로 바꾸고  
합친 df를 df_users로 수정  

In [20]:
# # df_users DataFrame에서 user_id 컬럼의 고유한 값의 수를 계산
# unique_user_ids = df_users.agg(countDistinct("user_id").alias("distinct_user_ids")).collect()[0]["distinct_user_ids"]
# print(unique_user_ids)

49291


### df_users 확인

In [56]:
# # 'birthday' 컬럼의 고유한 값과 그 빈도수를 계산
# users_birthday_counts = df_users.groupBy("birthday").count()

# # 결과를 'count' 컬럼에 따라 내림차순으로 정렬
# users_birthday_counts_ordered = users_birthday_counts.orderBy(F.desc("count"))

# # DataFrame의 전체 행 수를 얻습니다.
# total_rows_of_birthday = users_birthday_counts_ordered.count()

# # 결과를 출력
# users_birthday_counts_ordered.show(total_rows_of_birthday, truncate=False)

+--------+-----+
|birthday|count|
+--------+-----+
|20000101|24   |
|19970316|17   |
|19941231|16   |
|19900101|15   |
|19820827|13   |
|19981201|12   |
|20080317|11   |
|19820105|11   |
|20080729|11   |
|19991122|11   |
|19960611|11   |
|19820122|11   |
|19960722|11   |
|19940215|11   |
|20080924|11   |
|19930218|11   |
|19950118|11   |
|19940225|11   |
|20040426|11   |
|19940930|11   |
|19970829|11   |
|19890501|10   |
|19920430|10   |
|19930519|10   |
|19920206|10   |
|19931020|10   |
|19830210|10   |
|19930927|10   |
|20000505|10   |
|19920220|10   |
|20020126|10   |
|19810228|10   |
|19950710|10   |
|19960911|10   |
|19930125|10   |
|20020917|10   |
|20000105|10   |
|20001222|10   |
|19870228|10   |
|19931218|9    |
|19951002|9    |
|20081121|9    |
|20090102|9    |
|19920601|9    |
|19940530|9    |
|20001111|9    |
|20070929|9    |
|19920618|9    |
|19860218|9    |
|19891212|9    |
|19881010|9    |
|19930313|9    |
|20030513|9    |
|19920323|9    |
|19930826|9    |
|19941128|9   

위의 실행을 통해 df_users / birthday 의 값에 null도 잘못된 값도 하나도 없는 게 확인됨.  
하지만 아래의 result_df_users의 조인 결과 해당 값에 null이 들어간 경우가 발견됨  
-> 이 원인을 파악해 해결해야 함!

In [29]:
print(df_users.dtypes)
print(df_user_backup.dtypes)

[('date', 'date'), ('birthday', 'int'), ('mail', 'string'), ('gender', 'string'), ('perfect', 'double'), ('auth', 'int'), ('NOTICE_TOPIC', 'boolean'), ('isSoundAllowed', 'boolean'), ('language', 'string'), ('PRIZES_TOPIC', 'boolean'), ('monthlyPerfects', 'double'), ('engine', 'string'), ('win', 'int'), ('searchKeyWord', 'string'), ('isChallengeRequestAllowed', 'boolean'), ('image', 'string'), ('address', 'string'), ('REVIEW_TOPIC', 'boolean'), ('phoneId', 'string'), ('recommendCode', 'string'), ('realName', 'string'), ('phone', 'bigint'), ('name', 'string'), ('detailAddress', 'string'), ('ANY_TOPIC', 'boolean'), ('job', 'string'), ('MARKETING_TOPIC', 'boolean'), ('wingTime', 'string'), ('wing', 'int'), ('monthlyPoint', 'int'), ('point', 'int'), ('isOnline', 'boolean'), ('user_id', 'string'), ('isProvider', 'string'), ('lastParticipation', 'string'), ('previousMonthPoint', 'double'), ('monthlyPerfect', 'double'), ('fcmToken', 'string'), ('city', 'string'), ('best', 'double'), ('challeng

In [None]:
# # df_users 불완전 데이터 체크

# # user_id를 제외한 모든 컬럼 리스트 생성
# columns_except_user_id = [c for c in df_users.columns if c != "user_id"]

# # 모든 해당 컬럼들이 null인 조건 생성
# condition = [isnull(column_name) for column_name in columns_except_user_id]
# combined_condition = condition[0]
# for c in condition[1:]:
#     combined_condition &= c

# # 조건에 맞는 행을 필터링하고 개수 확인
# missing_rows_count = df_users.filter(combined_condition).count()
# print(f"Number of rows where all columns (except user_id) are missing: {missing_rows_count}")

# df_attend & df_users 조인 -> result_df_users
df_attend가 기준 스키마

In [22]:
# df_users 변환
df_users_transformed = df_users.withColumn("gender", when(col("gender") == "남자", "M").otherwise("W")) \
                               .withColumn("birthday", substring(col("birthday"), 3, 2)) \
                               .withColumnRenamed("user_id", "user_id_in_users") \
                               .withColumn("engine", when(col("engine") == "수도권", "Se")
                                           .when(col("engine") == "경상권", "Gy")
                                           .when(col("engine") == "강원권", "Ga")
                                           .when(col("engine") == "충청권", "Ch")
                                           .when(col("engine") == "전라권", "Jd")
                                           .when(col("engine") == "제주권", "Je")
                                           .otherwise("EE"))  # Jd가 전라도인지 제주도인지 불확실

# df_users_transformed에서 필요한 컬럼만 선택
df_users_transformed = df_users_transformed.select("user_id_in_users", "gender", "birthday", "engine", "job", "language", "perfect")

# 조인 조건 설정
join_condition_to_users = (df_attend["user_id"] == df_users_transformed["user_id_in_users"])

# 조인 수행
result_df_users = df_attend.join(df_users_transformed, join_condition_to_users, "left_outer")

In [32]:
result_df_users.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- ArrayVoice: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- attend: double (nullable = true)
 |-- resultPass: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- addText: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ArrayPercent: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ArrayDate: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- type: boolean (nullable = true)
 |-- review_wish: string (nullable = true)
 |-- ArrayAddResult: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- user_id_in_users: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)



In [33]:
# 전체 행의 수 계산
total_count = result_df_users.count()

# 'birth_year' 값이 있는 행의 수 계산
birth_year_present_count = result_df_users.filter(col("birthday").isNotNull()).count()

# 비율 계산
percentage_with_birth_year = (birth_year_present_count / total_count) * 100

percentage_with_birth_year

98.32487093875095

바뀐 12월 4일 user_df로 일치율 98.3%  
이전 일치율 93.0%

In [37]:
# # 'birthday' 컬럼의 고유한 값과 그 빈도수를 계산
# users_birthday_counts = result_df_users.groupBy("birthday").count()

# # 결과를 'count' 컬럼에 따라 내림차순으로 정렬
# users_birthday_counts_ordered = users_birthday_counts.orderBy(F.desc("count"))

# # DataFrame의 전체 행 수를 얻습니다.
# total_rows_of_birthday = users_birthday_counts_ordered.count()

# # 결과를 출력
# users_birthday_counts_ordered.show(total_rows_of_birthday, truncate=False)

+--------+-------+
|birthday|count  |
+--------+-------+
|null    |1055824|
|80      |561037 |
|82      |531518 |
|68      |505866 |
|69      |470754 |
|79      |468457 |
|70      |443029 |
|73      |434396 |
|76      |422014 |
|72      |421590 |
|74      |415710 |
|65      |414532 |
|83      |383579 |
|81      |378822 |
|85      |371589 |
|67      |367603 |
|78      |325048 |
|71      |322485 |
|77      |318686 |
|60      |304798 |
|88      |301697 |
|75      |296719 |
|87      |286744 |
|66      |279546 |
|95      |277416 |
|62      |257134 |
|84      |255417 |
|89      |253067 |
|86      |231566 |
|92      |223087 |
|64      |217561 |
|63      |214430 |
|93      |206017 |
|91      |190706 |
|90      |184228 |
|61      |183179 |
|97      |167665 |
|99      |166617 |
|00      |165828 |
|94      |164899 |
|59      |152335 |
|98      |151505 |
|96      |148355 |
|55      |119373 |
|58      |116347 |
|03      |112782 |
|56      |89091  |
|01      |88380  |
|06      |79118  |
|02      |78

11월 24일 데이터 기준 :  
attend데이터는 11월 24일,  
user, point, memo데이트는 11월 12일 데이터가 기준이기에  
전체 15218354개의 데이터 중 녹음데이터와 유저데이터가 믹스되지 않는  
birthyear값이 null인 행의 개수는 1055824개  

In [23]:
# # 필요한 컬럼만 선택하여 중복을 제거
# result_df_users = result_df_users.select(df_attend["*"],
#                                          df_users_transformed["gender"].alias("new_gender"),
#                                          df_users_transformed["birthday"].alias("new_birthday"),
#                                          df_users_transformed["engine"].alias("new_engine"),
#                                          df_users_transformed["job"],
#                                          df_users_transformed["language"],
#                                          df_users_transformed["perfect"])

# # coalesce 함수를 사용하여 컬럼 값을 갱신
# result_df_users = result_df_users.withColumn("gender", coalesce(col("new_gender"), col("gender"))) \
#                                  .withColumn("birthday", coalesce(col("new_birthday"), col("birth_year"))) \
#                                  .withColumn("engine", coalesce(col("new_engine"), col("local_code")))

# # 중복된 컬럼 및 임시 컬럼 제거
# result_df_users = result_df_users.drop("new_gender", "new_birthday", "new_engine", "user_id_in_users")
 
# # 중복된 컬럼 제거
# result_df_users = result_df_users.drop(df_users_transformed["user_id_in_users"])

#----------------------------------------------------------------------------------------------------------------#
# 11월 13일 이후로 변경된 코드

# 필요한 컬럼만 선택하여 중복을 제거
result_df_users = result_df_users.select(df_attend["*"],
                                         df_users_transformed["gender"].alias("new_gender"),
                                         df_users_transformed["birthday"].alias("new_birthday"),
                                         df_users_transformed["engine"].alias("new_engine"),
                                         df_users_transformed["job"],
                                         df_users_transformed["language"],
                                         df_users_transformed["perfect"])

# coalesce 함수를 사용하여 컬럼 값을 갱신
result_df_users = result_df_users.withColumn("gender", col("new_gender")) \
                                 .withColumn("birth_year", col("new_birthday")) \
                                 .withColumn("local_code", col("new_engine"))

# 중복된 컬럼 및 임시 컬럼 제거
result_df_users = result_df_users.drop("new_gender", "new_birthday", "new_engine", "user_id_in_users")

In [35]:
# 중간 결과 메모리 해제
df_users_transformed.unpersist()
df_attend.unpersist()

# result_df_users.show(5)

DataFrame[user_id: string, ArrayVoice: array<string>, attend: double, resultPass: string, imageUrl: string, addText: string, name: string, ArrayPercent: array<string>, ArrayDate: array<string>, title: string, type: boolean, review_wish: string, ArrayAddResult: array<string>]

- 아래 gender_count 출력을 통해 df_users는 대다수의 df_attend와 조인이 되었음을 확인할 수 있다
+------+-------+  
|gender|  count|  
+------+-------+  
|     F|  23445|  
|  null|  83857|  
|     M|1405604|  
|     W|5201125|  
+------+-------+  

## 분석
- before 23-10-19 -
language 값이 있는 행의 개수: 6592870  
language 값이 없는 행의 개수: 121161  
birth_year 값이 있는 행의 개수: 6630174  
birth_year 값이 없는 행의 개수: 83857  
name 값이 있는 행의 개수: 6626670  
name 값이 없는 행의 개수: 87361  
을 통해 대다수의 df_attend의 행과 df_users의 행이 조인되었다는 것을 확인할 수 있다

- 23-10-19 -
language 값이 있는 행의 개수: 13053761
language 값이 없는 행의 개수: 634020
birth_year 값이 있는 행의 개수: 13094265
birth_year 값이 없는 행의 개수: 593516
name 값이 있는 행의 개수: 13090674
name 값이 없는 행의 개수: 597107

In [54]:
# # 'birth_year' 컬럼의 고유한 값과 그 빈도수를 계산
# birth_year_counts = result_df_users.groupBy("birth_year").count()

# # 결과를 'count' 컬럼에 따라 내림차순으로 정렬
# birth_year_counts_ordered = birth_year_counts.orderBy(F.desc("count"))

# # DataFrame의 전체 행 수를 얻습니다.
# total_rows_of_birth = birth_year_counts_ordered.count()

# # 결과를 출력
# birth_year_counts_ordered.show(total_rows_of_birth, truncate=False)

+----------+-------+
|birth_year|count  |
+----------+-------+
|null      |1055824|
|80        |561037 |
|82        |531518 |
|68        |505866 |
|69        |470754 |
|79        |468457 |
|70        |443029 |
|73        |434396 |
|76        |422014 |
|72        |421590 |
|74        |415710 |
|65        |414532 |
|83        |383579 |
|81        |378822 |
|85        |371589 |
|67        |367603 |
|78        |325048 |
|71        |322485 |
|77        |318686 |
|60        |304798 |
|88        |301697 |
|75        |296719 |
|87        |286744 |
|66        |279546 |
|95        |277416 |
|62        |257134 |
|84        |255417 |
|89        |253067 |
|86        |231566 |
|92        |223087 |
|64        |217561 |
|63        |214430 |
|93        |206017 |
|91        |190706 |
|90        |184228 |
|61        |183179 |
|97        |167665 |
|99        |166617 |
|00        |165828 |
|94        |164899 |
|59        |152335 |
|98        |151505 |
|96        |148355 |
|55        |119373 |
|58        |1

순서 바꾸기 이전 출력
+----------+------+
|birth_year|count |
+----------+------+
|null      |593516|
|80        |523780|
|82        |499034|
|68        |458899|
|79        |434748|
|69        |423484|
|73        |401921|
|70        |399849|
|76        |393364|
|72        |388771|
|74        |381860|
|65        |372975|
|83        |358122|
|81        |357778|
|85        |341521|
|67        |332117|
|78        |299579|
|71        |297967|
|77        |296545|
|88        |287702|
|60        |274408|
|75        |271175|
|87        |266251|
|95        |257354|
|66        |251315|
|84        |239385|
|89        |238383|
|62        |232918|
|86        |220942|
|92        |209461|
|64        |196679|
|63        |195083|
|93        |194594|
|91        |178549|
|90        |172400|
|61        |166666|
|99        |160163|
|94        |159382|
|97        |158537|
|00        |156995|
|98        |142980|
|59        |140197|
|96        |139755|
|55        |108439|
|58        |105734|
|03        |102742|
|01        |84293 |
|56        |79712 |
|02        |76096 |
|06        |71832 |
|54        |65155 |
|57        |61586 |
|05        |60462 |
|04        |53866 |
|08        |47617 |
|53        |47043 |
|07        |46958 |
|52        |35343 |
|49        |27522 |
|51        |27231 |
|09        |26603 |
|48        |25130 |
|47        |22331 |
|50        |15646 |
|37        |10061 |
|42        |9053  |
|39        |3653  |
|43        |2936  |
|45        |1565  |
|46        |1325  |
|44        |395   |
|36        |212   |
|10        |86    |
|41        |82    |
|40        |10    |
|11        |3     |
+----------+------+

In [None]:
# # language 값이 있는 행의 개수
# language_present_count = result_df_users.filter(result_df_users.language.isNotNull()).count()
# # language 값이 없는 행의 개수
# language_absent_count = result_df_users.filter(result_df_users.language.isNull()).count()

# # birth_year 값이 있는 행의 개수
# birth_year_present_count = result_df_users.filter(result_df_users.birth_year.isNotNull()).count()
# # birth_year 값이 없는 행의 개수
# birth_year_absent_count = result_df_users.filter(result_df_users.birth_year.isNull()).count()

# # local_code 값이 있는 행의 개수
# local_code_present_count = result_df_users.filter(result_df_users.local_code.isNotNull()).count()
# # local_code 값이 없는 행의 개수
# local_code_absent_count = result_df_users.filter(result_df_users.local_code.isNull()).count()

# # # ArrayDate 값이 없으면서 record_time에 값이 있는 행의 개수
# # array_date_absent_record_time_present_count = result_df_users.filter(
# #     (result_df_users.ArrayDate.isNull()) & 
# #     (result_df_users.record_time.isNotNull())
# # ).count()

# print(f"language 값이 있는 행의 개수: {language_present_count}")
# print(f"language 값이 없는 행의 개수: {language_absent_count}")

# print(f"birth_year 값이 있는 행의 개수: {birth_year_present_count}")
# print(f"birth_year 값이 없는 행의 개수: {birth_year_absent_count}")

# print(f"name 값이 있는 행의 개수: {local_code_present_count}")
# print(f"name 값이 없는 행의 개수: {local_code_absent_count}")

# result_df_users & df_externals 조인 쿼리 수행
result_df_users 기준 스키마

## result_df_users와 df_externals의 user_id+ad_name 중복 행에 대한 분석

In [None]:
# df_attend.count()

6709019 (df_attend 변경 이전)

In [None]:
# # 각 DataFrame에서 user_id와 ad_name 조합이 유일한지 확인
# dup_df_attend = df_attend.groupBy('user_id', 'name').agg(F.count('*').alias('num_rows')).filter('num_rows > 1')
# print(dup_df_attend.count())
# dup_df_externals = df_externals.groupBy('user_id', 'ad_name').agg(F.count('*').alias('num_rows')).filter('num_rows > 1')
# print(dup_df_externals.count())

수정 전  
2782, 5182  
수정 후  
 , 0

In [None]:
# # 중복된 행을 생략 없이 출력 (처음 20행)
# dup_df_attend.show(n=40, truncate=False)
# dup_df_externals.show(n=40, truncate=False)

## result_df_users와 df_externals의 조인 수행 -> result_df_externals

In [24]:
result_df_users.printSchema()
df_externals.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- ArrayVoice: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- attend: double (nullable = true)
 |-- resultPass: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- addText: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ArrayPercent: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ArrayDate: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- type: boolean (nullable = true)
 |-- review_wish: string (nullable = true)
 |-- ArrayAddResult: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)

root
 |-- accuracy: integer (nullable = true)


In [25]:
# result_df_users와 df_externals의 조인

# 조인 조건 설정
join_condition_externals = (result_df_users["user_id"] == df_externals["user_id"]) & (result_df_users["name"] == df_externals["ad_name"])

df_externals = df_externals \
    .withColumnRenamed("birth_year", "external_birth_year") \
    .withColumnRenamed("local_code", "external_local_code") \
    .withColumnRenamed("gender", "external_gender")
# df_externals에서 가져올 컬럼 선택
selected_columns_externals = ["external_birth_year", "external_local_code", "external_gender"] # is_test는 제외시킴

# 조인 수행
result_df_externals = result_df_users.join(
    df_externals.select("user_id", "ad_name", *selected_columns_externals),
    join_condition_externals, "left_outer"
)

# selected_columns_externals 컬럼들에 대한 coalesce 적용
for column in selected_columns_externals:
    result_df_externals = result_df_externals.withColumn(column, coalesce(result_df_externals[column], df_externals[column]))

# 중복된 컬럼 제거
result_df_externals = result_df_externals.drop(df_externals["user_id"]).drop(df_externals["ad_name"]) \
                                        .drop("external_birth_year", "external_local_code", "external_gender")

## result_df_externals 확인 코드

In [47]:
# # 'birth_year' 컬럼의 고유한 값과 그 빈도수를 계산
# att_ex_birth_year_counts = result_df_externals.groupBy("birth_year").count()

# # 결과를 'count' 컬럼에 따라 내림차순으로 정렬
# att_ex_birth_year_counts_ordered = att_ex_birth_year_counts.orderBy(F.desc("count"))

# # DataFrame의 전체 행 수를 얻습니다.
# total_rows_of_externals_att_ex = att_ex_birth_year_counts_ordered.count()

# # 결과를 출력
# att_ex_birth_year_counts_ordered.show(total_rows_of_externals_att_ex, truncate=False)

+----------+-------+
|birth_year|count  |
+----------+-------+
|null      |1055824|
|80        |561037 |
|82        |531518 |
|68        |505866 |
|69        |470754 |
|79        |468457 |
|70        |443029 |
|73        |434396 |
|76        |422014 |
|72        |421590 |
|74        |415710 |
|65        |414532 |
|83        |383579 |
|81        |378822 |
|85        |371589 |
|67        |367603 |
|78        |325048 |
|71        |322485 |
|77        |318686 |
|60        |304798 |
|88        |301697 |
|75        |296719 |
|87        |286744 |
|66        |279546 |
|95        |277416 |
|62        |257134 |
|84        |255417 |
|89        |253067 |
|86        |231566 |
|92        |223087 |
|64        |217561 |
|63        |214430 |
|93        |206017 |
|91        |190706 |
|90        |184228 |
|61        |183179 |
|97        |167665 |
|99        |166617 |
|00        |165828 |
|94        |164899 |
|59        |152335 |
|98        |151505 |
|96        |148355 |
|55        |119373 |
|58        |1

In [27]:
# # 'ArrayDate' 배열의 각 요소를 별도의 행으로 변환
# exploded_df = result_df_externals.select(explode(col("ArrayDate")).alias("date_string"))

# july_2023_df = exploded_df.filter(col("date_string").startswith("23.07."))
# july_2023_count = july_2023_df.count()

# print(july_2023_count)

2429270


In [None]:
# # 'null' 값을 제외하고 'count' 합계를 계산합니다.
# non_null_birth_year_sum = att_ex_birth_year_counts_ordered \
#     .na.drop(subset=["birth_year"]) \
#     .agg(Fsum("count")).collect()[0][0]

# # 결과를 출력합니다.
# print(non_null_birth_year_sum)

In [None]:
# # 결과 확인
# first_row = result_df_externals.head().asDict()
# for key, value in first_row.items():
#     print(f"{key}: {value}")
# print(result_df_externals.columns)

In [None]:
# result_df_externals.count()

In [None]:
# def find_duplicate_columns(df):
#     columns = df.columns
#     duplicates = [col for col in columns if columns.count(col) > 1]
#     return list(set(duplicates))

# duplicate_columns = find_duplicate_columns(result_df_externals)
# print(duplicate_columns)

In [None]:
# non_null_externals_rows = df_externals.filter(
#     (df_externals["birth_year"].isNotNull()) |
#     (df_externals["local_code"].isNotNull()) |
#     (df_externals["is_test"].isNotNull())
# ).count()
# print(f"df_externals에서 'birth_year', 'local_code', 'is_test' 중 하나라도 null이 아닌 행의 수: {non_null_externals_rows}")

# # 2단계: result_df_externals에서 'birth_year', 'local_code', 'is_test' 중 하나라도 null인 행의 수 확인
# non_null_result_rows = result_df_externals.filter(
#     (result_df_externals["birth_year"].isNotNull()) |
#     (result_df_externals["local_code"].isNotNull()) |
#     (result_df_externals["is_test"].isNotNull())
# ).count()

# print(f"result_df_externals에서 'birth_year', 'local_code', 'is_test' 중 하나라도 null이 아닌 행의 수: {non_null_result_rows}")

231019 기준 출력  
df_externals에서 'birth_year', 'local_code', 'is_test' 중 하나라도 null이 아닌 행의 수: 1122859  
result_df_externals에서 'birth_year', 'local_code', 'is_test' 중 하나라도 null이 아닌 행의 수: 1053656  

In [None]:
# # df_attend와 df_externals에서 조인 조건에 맞는 키 값이 얼마나 많이 일치하는지 확인
# matching_keys_count = df_attend.select(*selected_columns_attend).join(
#     df_externals.select("user_id", "ad_name", *selected_columns_externals),
#     join_condition_attend_externals, "left_outer"
# ).count()
# print(f"Matching keys between df_attend and df_externals: {matching_keys_count}")

In [None]:
# # df_externals 내에서 user_id와 ad_name의 조합이 중복으로 나타나는 경우의 수
# duplicate_keys_count = df_externals.groupBy("user_id", "ad_name").agg(count("*").alias("num_rows")).filter("num_rows > 1").count()
# print(f"df_externals 내에서 user_id와 ad_name의 조합이 중복으로 나타나는 수: {duplicate_keys_count}")

# # df_externals 내에서 user_id와 ad_name, accuracy의 조합이 중복으로 나타나는 경우의 수
# duplicate_keys_count = df_externals.groupBy("user_id", "ad_name", "accuracy").agg(count("*").alias("num_rows")).filter("num_rows > 1").count()
# print(f"df_externals 내에서 user_id와 ad_name, accuracy의 조합이 중복으로 나타나는 수: {duplicate_keys_count}")

# # df_externals 내에서 user_id와 ad_name, record_time의 조합이 중복으로 나타나는 경우의 수
# duplicate_keys_count = df_externals.groupBy("user_id", "ad_name", "record_time").agg(count("*").alias("num_rows")).filter("num_rows > 1").count()
# print(f"df_externals 내에서 user_id, ad_name, 그리고 record_time의 조합이 중복으로 나타나는 수: {duplicate_keys_count}")

# # df_externals 내에서 user_id와 ad_name, gender의 조합이 중복으로 나타나는 경우의 수
# duplicate_keys_count = df_externals.groupBy("user_id", "ad_name", "gender").agg(count("*").alias("num_rows")).filter("num_rows > 1").count()
# print(f"df_externals 내에서 user_id, ad_name, 그리고 gender의 조합이 중복으로 나타나는 수: {duplicate_keys_count}")

# # df_externals 내에서 user_id와 ad_name, birth_year의 조합이 중복으로 나타나는 경우의 수
# duplicate_keys_count = df_externals.groupBy("user_id", "ad_name", "birth_year").agg(count("*").alias("num_rows")).filter("num_rows > 1").count()
# print(f"df_externals 내에서 user_id, ad_name, 그리고 birth_year의 조합이 중복으로 나타나는 수: {duplicate_keys_count}")

# # df_externals 내에서 record_time을 제외한 파일명이 중복으로 나타나는 경우의 수
# duplicate_keys_count = df_externals.groupBy("user_id", "ad_name", "gender", "birth_year", "local_code").agg(count("*").alias("num_rows")).filter("num_rows > 1").count()
# print(f"df_externals 내에서 record_time을 제외한 파일명이 중복으로 나타나는 경우의 수: {duplicate_keys_count}")

일단 외부 저장소(SSD, HDD)에 데이터가 중복 저장되었다고 간주, 다음 작업으로 넘어간다

df_externals 내에서 user_id와 ad_name의 조합이 중복으로 나타나는 수: 373451  
df_externals 내에서 user_id, ad_name, 그리고 gender의 조합이 중복으로 나타나는 수: 373450  
df_externals 내에서 user_id, ad_name, 그리고 birth_year의 조합이 중복으로 나타나는 수: 373446  

이 결과를 통해 user_id, ad_name, gender, birth_year 즉 SSD에 저장된 record_time이 없는 거의 대부분의 데이터가 HDD 데이터와 중복이 발생한다는 걸 확인할 수 있다.  
추가 작업을 통해 이 중복이 확실한 것인지 체크하고 관련해서 활동들이 필요하다  

user 수 := ssd u95 파일 수 := df_externals의 중복수   
이 세가지의 수가 서로 근사치임을 참고  

! u80은 80퍼 이상을 모두 포함하니 u95도 모두 포함됨 -> df_externals에 ssd의 u95는 포함시키지 X  

# result_df_externals & df_ad 조인 쿼리 수행

### result_df_externals가 기준 스키마¶

In [28]:
# df_ad에서 컬럼 이름 변경
df_ad_renamed = df_ad.withColumnRenamed("ageArray", "excepted_age_array") \
                   .withColumnRenamed("date", "ad_duration") \
                   .withColumnRenamed("dateEnd", "ad_duration_end") \
                   .withColumnRenamed("descArray", "desc_images_array") \
                   .withColumnRenamed("imageArray", "images_array") \
                   .withColumnRenamed("imageUrl", "thumbnail_image") \
                   .withColumnRenamed("link", "ad_link") \
                   .withColumnRenamed("player", "participant_count") \
                   .withColumnRenamed("name", "ad_name")

# df_ad에서 필요한 컬럼만 선택
selected_df_ad = df_ad_renamed.select("excepted_age_array", "collection", "ad_duration", "ad_duration_end",
                                       "desc_images_array", "images_array", "thumbnail_image", "level",
                                       "ad_link", "participant_count", "videoUrl", "ad_name", "script")

# 조인 수행
join_condition_ad = (result_df_externals["name"] == selected_df_ad["ad_name"])
result_df = result_df_externals.join(selected_df_ad, join_condition_ad, "left_outer")

# 중복 컬럼 제거
result_df = result_df.drop(selected_df_ad["ad_name"])

# # 최종 결과 출력
# print(result_df.columns)
# print(result_df.select("ArrayVoice").show(10))  # 확인하고 싶은 컬럼 넣고 확인

In [29]:
result_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- ArrayVoice: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- attend: double (nullable = true)
 |-- resultPass: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- addText: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ArrayPercent: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ArrayDate: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- type: boolean (nullable = true)
 |-- review_wish: string (nullable = true)
 |-- ArrayAddResult: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- excepted_age_array: string (nullable = tru

# result_df 데이터 정합성 및 검증 작업

In [None]:
# # 특정 user_id, ad_name으로 필터링
# filtered_result_df = result_df.filter(
#     (F.col("user_id") == "EyDwTDHLROSqn1aM6AhgvAWEvTs2") & 
#     (F.col("ad_name") == "5,000원 로봇청소기")
# )
# filtered_result_df.show()
# # 여기서도 중복이 발생 중

In [35]:
# 'ArrayDate' 배열의 각 요소를 별도의 행으로 변환
exploded_df = result_df.select(explode(col("ArrayDate")).alias("date_string"), col("collection"))

# '23.07.'로 시작하는 날짜 데이터 및 'collection'이 'point'인 조건으로 필터링
july_2023_df = exploded_df.filter((col("date_string").startswith("23.07.")) & (col("collection") == "Point"))

# 필터링된 데이터의 개수 계산
july_2023_count = july_2023_df.count()

# 결과 출력
print(july_2023_count)

408577


In [39]:
# 'ArrayDate' 배열의 각 요소와 'ArrayPercent' 배열의 마지막 요소를 별도의 행으로 변환
exploded_df = result_df.select(
    explode(col("ArrayDate")).alias("date_string"),
    element_at(col("ArrayPercent"), -1).alias("last_percent"),
    col("collection")
)

# '23.07.'로 시작하는 날짜 데이터, 'collection'이 'Point', 'ArrayPercent'의 마지막 요소가 80 이상인 조건으로 필터링
filtered_df = exploded_df.filter(
    (col("date_string").startswith("23.07.")) &
    (col("collection") == "Point") &
    (col("last_percent").cast("int") >= 80)
)

# 필터링된 데이터의 개수 계산
filtered_count = filtered_df.count()

# 결과 출력
print(filtered_count)

362147


## df_ad와 result_df_users의 name값의 고유 개수 및 일치 비율 체크

In [None]:
# # 'name' 컬럼의 unique한 값들을 추출
# unique_names_df_ad = df_ad.select("name").distinct()
# unique_names_result_df_users = result_df_users.select("name").distinct()

# # 두 데이터프레임에서 'name' 컬럼을 기반으로 inner join을 수행
# common_names = unique_names_df_ad.join(unique_names_result_df_users, "name")

# # 일치하는 'name' 값의 수를 계산
# count_common_names = common_names.count()

# # 각 데이터프레임에서 unique한 'name' 값의 수를 계산
# count_unique_names_df_ad = unique_names_df_ad.count()
# count_unique_names_result_df_users = unique_names_result_df_users.count()

# # 일치 비율 계산
# match_ratio = (count_common_names / (count_unique_names_df_ad + count_unique_names_result_df_users - count_common_names)) * 100

# print(f"일치하는 이름의 수: {count_common_names}")
# print(f"df_ad의 고유한 이름 수: {count_unique_names_df_ad}")
# print(f"result_df_users의 고유한 이름 수: {count_unique_names_result_df_users}")
# print(f"일치 비율: {match_ratio}%")

-backup 데이터 추가 이전-  
일치하는 이름의 수: 5914  
df_ad의 고유한 이름 수: 6471  
result_df_users의 고유한 이름 수: 18607  
일치 비율: 30.859945731580048%  

-backup 데이터 추가 이후-  
일치하는 이름의 수: 16494  
df_ad의 고유한 이름 수: 20248  
result_df_users의 고유한 이름 수: 18607  
일치 비율: 73.7623540986539%

-backup 231019 데이터 추가 이후-  
일치하는 이름의 수: 25767
df_ad의 고유한 이름 수: 26849
result_df_users의 고유한 이름 수: 27117
일치 비율: 91.37558069435087%

## df_ad에서 find_name을 포함하는 이름을 가진 행들을 찾아서 출력

In [None]:
# find_name = "위크나인 벌룬독 성인 대형 물놀이 튜브 해수욕장 계곡 감성튜브"

# # find_name 문자열을 포함하는 행만 필터링 -> 필요한 데이터만 필터링해 메모리 부담을 줄임
# filtered_df_ad = df_ad.filter(df_ad.name.contains(find_name))

# # 필터링된 데이터만 collect() 호출
# filtered_names = [row.name for row in filtered_df_ad.collect()]

# if filtered_names:
#     print(f'"{find_name}" 문자열을 포함한 이름을 가진 행이 {len(filtered_names)}개 있습니다.')
#     print("포함된 name들:")
#     for name in filtered_names:
#         print(name)
# else:
#     print(f'"{find_name}" 문자열을 포함한 이름을 가진 행이 없습니다.')

## result_df_users에만 있는 'name'의 값이 df_ad에도 있는 지 체크

In [None]:
# # result_df_users에만 있는 'name'을 찾습니다.
# unique_names_only_in_result_df_users = unique_names_result_df_users.join(
#     unique_names_df_ad, ["name"], "left_anti"
# )

# # 일치하지 않는 'name' 몇 개를 선택합니다. (예시에서는 최대 5개)
# sample_unmatched_names = unique_names_only_in_result_df_users.limit(5).collect()

# # 선택한 'name'이 df_ad에 있는지 확인합니다.
# for row in sample_unmatched_names:
#     name_to_check = row.name
#     count_in_df_ad = df_ad.filter(df_ad.name == name_to_check).count()
    
#     if count_in_df_ad > 0:
#         print(f"'{name_to_check}' is in df_ad but was listed as unique to result_df_users.")
#     else:
#         print(f"'{name_to_check}' is truly unique to result_df_users.")

## result_df의 특정 컬럼의 특정 값을 가진 행을 찾아 특정 컬럼의 값을 도출

In [None]:
# # Find rows where 'name' matches the specified string
# find_name = "위크나인 벌룬독 성인 대형 물놀이 튜브 해수욕장 계곡 감성튜브 06845"
# filtered_df_result = result_df.filter(result_df.name == find_name)

# # Collect the 'collection' value(s) for the matched row(s)
# collection_values = [row.collection for row in filtered_df_result.collect()]

# # Display the 'ad_duration' value(s)
# print(collection_values)

## 'result_df_users'에만 존재하는 'name' 컬럼 값들
### 에 대한 확인
해당 name값들이 df_ad에 있는지 없는지 출력

In [None]:
# # 'result_df_users'에만 존재하는 'name' 컬럼 값 찾기
# only_in_result_df_users = unique_names_result_df_users.join(unique_names_df_ad, "name", "left_anti")

# # 'only_in_result_df_users' 데이터프레임에서 고유한 이름들을 추출합니다.
# unique_names_to_check = [row.name for row in only_in_result_df_users.collect()]

# # 이 이름들이 df_ad에 실제로 존재하는지 확인합니다.
# found_names_in_df_ad = df_ad.filter(df_ad.name.isin(unique_names_to_check)).collect()

# if found_names_in_df_ad:
#     print("다음 이름들은 df_ad에 실제로 존재합니다:")
#     for row in found_names_in_df_ad:
#         print(row.name)
# else:
#     print("어떤 이름도 df_ad에 존재하지 않습니다.")
    
# # 두 DataFrame에 모두 존재하는 이름을 찾기 위해 inner join을 사용
# common_names = only_in_result_df_users.join(df_ad, ["name"], "inner")

# print(f"result_df_users'에만 있는 name의 수: {only_in_result_df_users.count()}")
# print(f"common_names의 수: {common_names.count()}")

## result_df에서 'result_df_users'에만 있고 'df_ad'에는 없는 이름을 가진 행의 개수

In [None]:
# # unique_names_only_in_result_df_users DataFrame에서 'name' 컬럼을 추출합니다.
# unique_names_list = [row.name for row in unique_names_only_in_result_df_users.collect()]

# # result_df에서 이 이름들을 가진 행을 필터링합니다.
# filtered_result_df = result_df.filter(result_df.name.isin(unique_names_list))

# # 필터링된 DataFrame의 행 개수를 센다.
# count_of_filtered_rows = filtered_result_df.count()

# # 행의 개수를 출력한다.
# print(f"result_df에서 'result_df_users'에만 있고 'df_ad'에는 없는 이름을 가진 행의 개수: {count_of_filtered_rows}")

230906일/   
result_df에서 'result_df_users'에만 있고 'df_ad'에는 없는 이름을 가진 행의 개수: 245088  
245088이 전체 행에서 차지하는 비율이 3.6%이기에 일단 패싱하기로 했다

## df_ad의 가장 이른 시기의 값 & 가장 늦은 시기의 값

In [None]:
# # 가장 이른 시기의 값
# earliest_duration_end = df_ad_renamed.agg(F.min("ad_duration_end")).collect()[0][0]

# # 가장 늦은 시기의 값
# latest_duration_end = df_ad_renamed.agg(F.max("ad_duration_end")).collect()[0][0]

# print(f"The earliest ad_duration_end is: {earliest_duration_end}")
# print(f"The latest ad_duration_end is: {latest_duration_end}")


In [None]:
# # collection 값이 'Memorization'이고 name의 뒤 5자리가 숫자가 아닌 행을 필터링
# memorization_with_non_numeric_suffix = df_ad.filter(
#     (df_ad['collection'] == 'Memorization') & 
#     (F.substring(df_ad['name'], -5, 5).cast("int").isNull())
# )

# # 해당 행의 'name' 값만 선택
# memorization_names = memorization_with_non_numeric_suffix.select('name').collect()

# # 생략 없이 전체 'name' 값을 출력
# for row in memorization_names:
#     print(row['name'])

## 특정 날짜 이후에 행의 개수 출력

In [None]:
# count_rows = df_ad_renamed.filter(F.col("ad_duration_end") >= "25.09.13").count()
# print(count_rows)

## result_df 특정 컬럼에 값이 있는 행의 개수와 없는 행의 개수 출력

In [41]:
# # collection 값이 있는 행의 개수
# collection_present_count = result_df.filter(result_df.collection.isNotNull()).count()

# # collection 값이 없는 행의 개수
# collection_absent_count = result_df.filter(result_df.collection.isNull()).count()

# print(f"collection 값이 있는 행의 개수: {collection_present_count}")
# print(f"collection 값이 없는 행의 개수: {collection_absent_count}")

collection 값이 있는 행의 개수: 15406762
collection 값이 없는 행의 개수: 343575


- 2023-09-18 -
collection 값이 있는 행의 개수: 6746283  
collection 값이 없는 행의 개수: 242210
- 2023-10-19 -
collection 값이 있는 행의 개수: 14045696
collection 값이 없는 행의 개수: 166746

In [None]:
# # collection 컬럼이 "Memorization"인 행의 개수를 계산합니다.
# memorization_count = result_df.filter(result_df.collection == "Memorization").count()

# # collection 컬럼이 "Point"인 행의 개수를 계산합니다.
# point_count = result_df.filter(result_df.collection == "Point").count()

# # collection 컬럼이 "Memorization" 또는 "Point"가 아닌 행들을 그룹화하고 개수를 계산합니다.
# # 결과는 개수가 많은 순으로 정렬됩니다.
# other_collections = result_df.filter((result_df.collection != "Memorization") & (result_df.collection != "Point")) \
#     .groupBy("collection").count().orderBy(F.desc("count"))

# print(f"'Memorization'의 개수: {memorization_count}")
# print(f"'Point'의 개수: {point_count}")

# # 다른 collection 값들과 그 개수를 출력합니다.
# other_collections.show()

'Memorization'의 개수: 4876147  
'Point'의 개수: 1878464  
+----------+-----+  
|collection|count|  
+----------+-----+  
+----------+-----+  

In [None]:
# # 콜렉션 값이 없는 행들이 어떤 특성을 가지고 있는지 대략적으로 파악
# result_df.filter(result_df.collection.isNull()).describe().show()

+-------+------------------------------+--------------------+--------------------------+-------------------------+-------------------------+------------------+--------------------------+------------------+----------+------------------+------+----------+--------+------------------+------------------+----------+-----------+---------------+-----------------+------------+---------------+-----+-------+-----------------+--------+------+-----------------+-----------------+--------------------------------+----------------------+------------------------+------------------+------------------+-----------+-------------------+------------------+-------------------+
|summary|                       ad_name|             user_id|                    attend|                is_passed|                 imageUrl|         ad_script|                  ad_title|        birth_year|local_code|           is_test|gender|       job|language|           perfect|excepted_age_array|collection|ad_duration|ad_duration_end|desc_images_array|images_array|thumbnail_image|level|ad_link|participant_count|videoUrl|script|   accuracy_array|   stt_text_array|         created_timestamp_array|average_accuracy_by_ad|average_accuracy_by_user| pass_rate_by_user|   pass_rate_by_ad|text_length|          try_count|    user_avg_tries|       ad_avg_tries|
+-------+------------------------------+--------------------+--------------------------+-------------------------+-------------------------+------------------+--------------------------+------------------+----------+------------------+------+----------+--------+------------------+------------------+----------+-----------+---------------+-----------------+------------+---------------+-----+-------+-----------------+--------+------+-----------------+-----------------+--------------------------------+----------------------+------------------------+------------------+------------------+-----------+-------------------+------------------+-------------------+
|  count|                        242209|              242210|                    242209|                   242209|                   242209|            241671|                    242209|            237045|    237032|              3664|237045|    233255|  236900|            215115|                 0|         0|          0|              0|                0|           0|              0|    0|      0|                0|       0|     0|           242210|           242210|                          242210|                160420|                  241978|            238215|            197426|          0|             242210|            242210|             242209|
|   mean|             5.453968253968254|                null|         1.142196889286341|        2.717853839037928|       3.9800995024875623|206.48684210526315|        261.48979591836735|  73.3341897108144|      null|0.0679585152838428|  null|      null|    null| 1794.871650047649|              null|      null|       null|           null|             null|        null|           null| null|   null|             null|    null|  null|98.70958986529398|3714.222222222222|               91.69694017877607|     91.98540831320045|       88.39153672906248| 97.91226707777273| 96.89807826729773|       null| 1.0638990958259362|1.1423105014017387| 1.0639076169754156|
| stddev|             2.848668556845405|                null|        1.3438310218056957|       1.3105502876253814|       2.4897668285433285| 2800.605587985752|        317.13683506561875|22.163760471198675|      null|0.2517090530036565|  null|      null|    null|1989.1856077822001|              null|      null|       null|           null|             null|        null|           null| null|   null|             null|    null|  null|8.089708582091447|11107.06110744172|               6.709011626670002|     7.007509856774907|       7.186478274386401|4.5908543636958195|12.099177539050835|       null|0.47101817509175314|0.1709641598865054|0.21101261436997873|
|    min|   ""100개인 케이크 유 오케...|00fntnOCsqhJdmwJN...| ""10 a 미니 케이크 유 ...| ""10 min 케이크 유 오...| ""100 a 민 케이크 유 ...|                  | ""100 a 미니 케이크 유...|                 0|        Ch|                 0|     F|개인사업가|대한민국|               0.0|              null|      null|       null|           null|             null|        null|           null| null|   null|             null|    null|  null|                 |                 |                                |                   0.0|                     0.0|10.126582278481013|0.5263157894736842|       null|                 -1|               1.0|                1.0|
|    max|힘가네 흑염소진액 1박스(100...|zzpUGZW1lwbYyP9jv...|                       9.0|                     pass|                     pass|    힌지보호케이스|                현대자동차|                99|        Se|                 1|     W|    회사원|대한민국|            6333.0|              null|      null|       null|           null|             null|        null|           null| null|   null|             null|    null|  null|            99,99|     힙합,입 호흡|회의 민 케이크 유 오케이 하이...|                 100.0|                   100.0| 99.96978851963746| 99.93084370677732|       null|                 69|19.705882352941178|               50.0|
+-------+------------------------------+--------------------+--------------------------+-------------------------+-------------------------+------------------+--------------------------+------------------+----------+------------------+------+----------+--------+------------------+------------------+----------+-----------+---------------+-----------------+------------+---------------+-----+-------+-----------------+--------+------+-----------------+-----------------+--------------------------------+----------------------+------------------------+------------------+------------------+-----------+-------------------+------------------+-------------------+

In [None]:
# # thumbnail_image 값이 있는 행의 개수
# collection_present_count = result_df.filter(result_df.thumbnail_image.isNotNull()).count()

# # thumbnail_image 값이 없는 행의 개수
# collection_absent_count = result_df.filter(result_df.thumbnail_image.isNull()).count()

# print(f"collection 값이 있는 행의 개수: {collection_present_count}")
# print(f"collection 값이 없는 행의 개수: {collection_absent_count}")

In [None]:
# # result_df_users 데이터프레임에서 "name" 컬럼에 결측값이 있는 행의 수를 센다
# null_count_result_df_users = result_df_users.filter(result_df_users.name.isNull()).count()

# # df_ad 데이터프레임에서 "name" 컬럼에 결측값이 있는 행의 수를 센다
# null_count_df_ad = df_ad.filter(df_ad.name.isNull()).count()

# print(f'"result_df_users"의 "name" 컬럼에서 결측값 개수: {null_count_result_df_users}')
# print(f'"df_ad"의 "name" 컬럼에서 결측값 개수: {null_count_df_ad}')

## result_df의 addText와 script값 비교

In [None]:
# 나중에 문자열 유사도나 통계적 방법론을 통해 비교

# result_df 변경 작업

## Column 명을 더 적절하게 변경하는 작업

In [41]:
# 칼럼명 변환
result_df = result_df.withColumnRenamed("ArrayVoice", "stt_text_array")\
                     .withColumnRenamed("ArrayPercent", "accuracy_array")\
                     .withColumnRenamed("ArrayDate", "created_timestamp_array")\
                     .withColumnRenamed("name", "ad_name")\
                     .withColumnRenamed("title", "ad_title")\
                     .withColumnRenamed("resultPass", "is_passed")\
                     .withColumnRenamed("addText", "ad_script")

## 쓸모없는 컬럼들 drop

In [42]:
result_df = result_df.drop(
    "attend",
    "imageUrl",
    "type",
    "review_wish",
    "ArrayAddResult",
    "ad_duration_end",
    "desc_images_array",
    "images_array",
    "thumbnail_image",
    "videoUrl",
    "script"
)

In [43]:
result_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_passed: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_title: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- excepted_age_array: string (nullable = true)
 |-- collection: string (nullable = true)
 |-- ad_duration: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ad_link: string (nullable = true)
 |-- participant_count: integer (nullable = true)



## 데이터 타입들을 더 적절한 타입으로 변경하는 작업

231019 이후로는 parquet으로 attend 데이터를 받기에 string -> array 작업이 필요없음,  
기존에 해당 작업이 필요했던 건 JSON -> CSV 과정에서 array들이 ""로 감싸지면서 string이 됐기 때문  

In [None]:
# # substring 함수를 사용하여 앞뒤의 대괄호를 제거
# # 예: "['100', '80']" -> "'100', '80'"
# # 그 다음 split 함수를 사용하여 문자열을 배열로 변환
# # Spark SQL에서는 문자열 인덱스가 1부터 시작, 그래서 substring의 2는 앞뒤의 대괄호를 제거하는 역할
# # 배열의 각 원소 앞뒤에 붙어 있는 따옴표(')를 제거하고, 원소들을 DoubleType으로 형변환
# # TRANSFORM 함수는 배열의 각 원소에 함수를 적용
# def transform_string_to_array(df, column_name, cast_type):
#     """
#     Array* 형태의 컬럼들의 데이터 타입을 string에서 array안 '특정한 데이터 타입'으로 변경하는 함수
    
#     Parameters:
#         df: 변경할 dataframe
#         column_name: 변경할 Array* 형태의 컬럼명(데이터 앞뒤가 "'로 작성된)
#         cast_type: 바꿀 특정한 데이터 타입
        
#     Returns:
#         해당 컬럼들의 데이터 타입이 변경된 df
#     """
#     new_column_name = f"{column_name}_new"
#     df = df.withColumn(
#         new_column_name, 
#         F.split(F.expr(f"substring({column_name}, 2, length({column_name})-2)"), "', '")
#     ).withColumn(
#         new_column_name,
#         F.expr(f"TRANSFORM({new_column_name}, x -> CAST(trim(BOTH '\\'' FROM x) AS {cast_type}))")
#     )
#     return df.drop(column_name).withColumnRenamed(new_column_name, column_name)

# # 변환할 컬럼과 데이터 타입을 지정
# columns_to_transform = {
#     'ArrayPercent': 'DOUBLE',
#     'ArrayVoice': 'STRING',
#     'ArrayDate': 'STRING'
# }

# # 각 컬럼을 순회하며 변환 수행
# for col, cast_type in columns_to_transform.items():
#     result_df = transform_string_to_array(result_df, col, cast_type)

In [44]:
# UDF(User Defined Function) 정의: 문자열 배열을 정수 배열로 변환
def string_array_to_int_array(str_arr):
    return [int(float(x)) for x in str_arr]  # 문자열을 먼저 실수로 변환한 후 정수로 변환

string_array_to_int_array_udf = udf(string_array_to_int_array, ArrayType(IntegerType()))

# 'accuracy_array' 컬럼의 데이터 타입을 string에서 int로 변경
result_df = result_df.withColumn("accuracy_array", string_array_to_int_array_udf("accuracy_array"))

## 바뀐 result_df의 column, schema 출력

In [45]:
print(result_df.columns)
result_df.printSchema()
# print(result_df.select("created_timestamp_array").show(20))

['user_id', 'stt_text_array', 'is_passed', 'ad_script', 'ad_name', 'accuracy_array', 'created_timestamp_array', 'ad_title', 'job', 'language', 'perfect', 'gender', 'birth_year', 'local_code', 'excepted_age_array', 'collection', 'ad_duration', 'level', 'ad_link', 'participant_count']
root
 |-- user_id: string (nullable = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_passed: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_title: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nul

- 231019 이전 -
['user_id', 'attend', 'is_passed', 'imageUrl', 'ad_script', 'ad_name', 'ad_title', 'birth_year', 'local_code', 'is_test', 'gender', 'job', 'language', 'perfect', 'excepted_age_array', 'collection', 'ad_duration', 'ad_duration_end', 'desc_images_array', 'images_array', 'thumbnail_image', 'level', 'ad_link', 'participant_count', 'videoUrl', 'script', 'accuracy_array', 'stt_text_array', 'created_timestamp_array']
root
 |-- user_id: string (nullable = true)
 |-- attend: string (nullable = true)
 |-- is_passed: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- ad_title: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- is_test: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- excepted_age_array: string (nullable = true)
 |-- collection: string (nullable = true)
 |-- ad_duration: string (nullable = true)
 |-- ad_duration_end: string (nullable = true)
 |-- desc_images_array: string (nullable = true)
 |-- images_array: string (nullable = true)
 |-- thumbnail_image: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ad_link: string (nullable = true)
 |-- participant_count: integer (nullable = true)
 |-- videoUrl: string (nullable = true)
 |-- script: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 
 - 23-10-19 - 
 ['user_id', 'stt_text_array', 'attend', 'is_passed', 'imageUrl', 'ad_script', 'ad_name', 'accuracy_array', 'created_timestamp_array', 'ad_title', 'birth_year', 'local_code', 'is_test', 'gender', 'job', 'language', 'perfect', 'excepted_age_array', 'collection', 'ad_duration', 'ad_duration_end', 'desc_images_array', 'images_array', 'thumbnail_image', 'level', 'ad_link', 'participant_count', 'videoUrl', 'script']
root
 |-- user_id: string (nullable = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- attend: double (nullable = true)  <- string에서
 |-- is_passed: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_title: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- is_test: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- excepted_age_array: string (nullable = true)
 |-- collection: string (nullable = true)
 |-- ad_duration: string (nullable = true)
 |-- ad_duration_end: string (nullable = true)
 |-- desc_images_array: string (nullable = true)
 |-- images_array: string (nullable = true)
 |-- thumbnail_image: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ad_link: string (nullable = true)
 |-- participant_count: integer (nullable = true)
 |-- videoUrl: string (nullable = true)
 |-- script: string (nullable = true)

## collection 컬럼에 값이 Point, Memorization이 아닌 행들 delete

In [46]:
# "Memorization" 또는 "Point"가 아닌 행들을 필터링하여 제거합니다.
result_df = result_df.filter((result_df.collection == "Memorization") | (result_df.collection == "Point"))

In [55]:
# # 필터링된 DataFrame의 행 개수를 확인합니다. (옵션)
# filtered_count = result_df.count()

# print(f"필터링 후 행의 개수: {filtered_count}")

필터링 후 행의 개수: 15406762


## 특성 엔지니어링 관련

### 광고/유저 별 평균 정확도

#### 광고/유저 별 평균 정확도 계산 및 관련 컬럼 추가

In [47]:
# 필요한 컬럼만 선택
selected_df = result_df.select("ad_name", "accuracy_array", "user_id")
# print(f"selected_df 수:{selected_df.count()}")

# accuracy_array Null 및 빈 배열 제거
filtered_df = selected_df.filter(
    (F.col("accuracy_array").isNotNull()) & (F.size(F.col("accuracy_array")) > 0)
)
# print(f"filtered_df 수:{filtered_df.count()}")

# 중복 확인
deduped_df = filtered_df.dropDuplicates(["ad_name", "user_id", "accuracy_array"])
# print(f"deduped_df 수:{deduped_df.count()}")

# accuracy_array를 explode하여 각 원소를 별도의 행으로 만듭니다.
exploded_df = deduped_df.withColumn("accuracy", F.explode(F.col("accuracy_array")))

# 이제 ad_name 별로 accuracy의 평균을 계산합니다.
df_average_by_ad = exploded_df.groupBy("ad_name").agg(F.avg("accuracy").alias("average_accuracy_by_ad"))

# 이제 user_id 별로 accuracy의 평균을 계산합니다.
df_average_by_user = exploded_df.groupBy("user_id").agg(F.avg("accuracy").alias("average_accuracy_by_user"))

# # 결과 확인
# df_average_by_ad.show()
# df_average_by_user.show()

#### 광고/유저 별 평균 정확도가 잘 계산되었는지 확인

In [None]:
# df_ad.printSchema()
# df_users.printSchema()

In [None]:
# # df_ad에서 고유한 ad_name의 개수를 확인
# unique_ad_names_count = df_ad.select("name").distinct().count()

# # df_average_by_ad 행 개수를 확인
# average_by_ad_count = df_average_by_ad.count()

# # df_users에서 고유한 user_id의 개수를 확인
# unique_user_ids_count = df_users.select("user_id").distinct().count()

# # df_average_by_user의 행 개수를 확인
# average_by_user_count = df_average_by_user.count()

# print(f"df_ad에서 고유한 ad_name의 개수: {unique_ad_names_count}")
# print(f"df_average_by_ad 행 개수: {average_by_ad_count}")
# print(f"df_users에서 고유한 user_id의 개수: {unique_user_ids_count}")
# print(f"df_average_by_user의 행 개수: {average_by_user_count}")

윗 셀 출력 230908:  
df_ad에서 고유한 ad_name의 개수: 20248  
df_average_by_ad 행 개수: 18606  
df_users에서 고유한 user_id의 개수: 34539  
df_average_by_user의 행 개수: 25587  

위 셀의 출력을 통해 230908에 추가한 유저백업데이터로 인한 정합성 강화 확인    
-230908에 추가한 기준-    
df_ad에서 고유한 ad_name의 개수: 20248  
df_average_by_ad 행 개수: 18606  
df_users에서 고유한 user_id의 개수: 34539  
df_average_by_user의 행 개수: 25587  

#### 광고/유저 별 평균 정확도 컬럼을 result_df에 추가

In [48]:
# # JOIN 으로
# # 광고 별 평균 정확도를 result_df에 추가
# result_df = result_df.join(df_average_by_ad, result_df.ad_name == df_average_by_ad.ad_name, "left").drop(df_average_by_ad.ad_name)
# # 유저 별 평균 정확도를 result_df에 추가
# result_df = result_df.join(df_average_by_user, result_df.user_id == df_average_by_user.user_id, "left").drop(df_average_by_user.user_id)


# broadcasting으로
# 광고 별 평균 정확도를 result_df에 브로드캐스팅을 사용하여 추가
result_df = result_df.join(broadcast(df_average_by_ad), ["ad_name"], "left")
# 유저 별 평균 정확도를 result_df에 브로드캐스팅을 사용하여 추가
result_df = result_df.join(broadcast(df_average_by_user), ["user_id"], "left")


# 결과 데이터프레임 확인 (스키마 출력)
result_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_passed: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_title: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- excepted_age_array: string (nullable = true)
 |-- collection: string (nullable = true)
 |-- ad_duration: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ad_link: string (nullable = true)
 |-- participant_count: integer (nullable = true)
 |-- aver

### 광고 별 / 유저 별 합격률 column 추가

#### is_passed 컬럼에 어떤 값들이 들어있는지, 각각의 비율은 얼만지 확인

In [None]:
# # is_passed 컬럼에 들어있는 고유한 값들을 확인
# unique_is_passed_values = result_df.select("is_passed").distinct()
# unique_is_passed_values.show()

In [49]:
# 'is_passed' 컬럼에서 'fail'과 'pass'에 해당하는 행만 필터링
filtered_is_passed_df = result_df.filter(F.col("is_passed").isin(["fail", "pass"]))

# # 'fail'과 'pass'에 해당하는 행의 수를 각각 계산
# fail_count = filtered_is_passed_df.filter(F.col("is_passed") == "fail").count()
# pass_count = filtered_is_passed_df.filter(F.col("is_passed") == "pass").count()

# # 전체 행의 수를 계산
# total_count_is_passed = pass_count + fail_count

# # 'fail'과 'pass'가 전체에서 차지하는 비율을 계산
# fail_percentage = (fail_count / total_count_is_passed) * 100
# pass_percentage = (pass_count / total_count_is_passed) * 100

# print(f"'fail' 비율: {fail_percentage}%")
# print(f"'pass' 비율: {pass_percentage}%")
# 230908 출력
# 'fail' 비율: 1.5755910504026127%
# 'pass' 비율: 98.42440894959739%

#### 유저 별 / 광고 별 합격률 column 생성 및 추가

In [50]:
# # GroupBy 사용
# # 유저별 합격률 계산
# user_pass_rate = filtered_is_passed_df.groupBy("user_id", "is_passed").count() \
#     .groupBy("user_id").pivot("is_passed").sum("count") \
#     .withColumn("pass_rate_by_user", (F.col("pass") / (F.col("pass") + F.col("fail"))) * 100)
# # 광고별 합격률 계산
# ad_pass_rate = filtered_is_passed_df.groupBy("ad_name", "is_passed").count() \
#     .groupBy("ad_name").pivot("is_passed").sum("count") \
#     .withColumn("pass_rate_by_ad", (F.col("pass") / (F.col("pass") + F.col("fail"))) * 100)


# Window 함수 사용
# 유저별 합격률 계산
window_by_user = Window.partitionBy("user_id")
user_pass_rate = filtered_is_passed_df.withColumn("total_by_user", F.count("is_passed").over(window_by_user)) \
    .withColumn("pass_count", F.sum(F.when(F.col("is_passed") == 'pass', 1).otherwise(0)).over(window_by_user)) \
    .withColumn("pass_rate_by_user", (F.col("pass_count") / F.col("total_by_user")) * 100) \
    .dropDuplicates(["user_id"])
# 광고별 합격률 계산
window_by_ad = Window.partitionBy("ad_name")
ad_pass_rate = filtered_is_passed_df.withColumn("total_by_ad", F.count("is_passed").over(window_by_ad)) \
    .withColumn("pass_count", F.sum(F.when(F.col("is_passed") == 'pass', 1).otherwise(0)).over(window_by_ad)) \
    .withColumn("pass_rate_by_ad", (F.col("pass_count") / F.col("total_by_ad")) * 100) \
    .dropDuplicates(["ad_name"])


# broadcasting 사용
# 유저별 합격률을 result_df에 브로드캐스팅으로 추가
result_df = result_df.join(broadcast(user_pass_rate.select("user_id", "pass_rate_by_user")), ["user_id"], "left")
# 광고별 합격률을 result_df에 브로드캐스팅으로 추가
result_df = result_df.join(broadcast(ad_pass_rate.select("ad_name", "pass_rate_by_ad")), ["ad_name"], "left")

# # Join 사용
# # 유저별 합격률을 result_df에 추가
# result_df = result_df.join(user_pass_rate, ["user_id"], "left")
# # 광고별 합격률을 result_df에 추가
# result_df = result_df.join(ad_pass_rate, ["ad_name"], "left")

In [51]:
user_pass_rate.unpersist()
ad_pass_rate.unpersist()

DataFrame[user_id: string, ad_name: string, stt_text_array: array<string>, is_passed: string, ad_script: string, accuracy_array: array<int>, created_timestamp_array: array<string>, ad_title: string, job: string, language: string, perfect: double, gender: string, birth_year: string, local_code: string, excepted_age_array: string, collection: string, ad_duration: string, level: string, ad_link: string, participant_count: int, average_accuracy_by_ad: double, average_accuracy_by_user: double, total_by_ad: bigint, pass_count: bigint, pass_rate_by_ad: double]

In [52]:
result_df = result_df.drop("fail", "pass")

In [None]:
# # 결과 확인
# user_pass_rate.show()
# ad_pass_rate.show()

In [53]:
# 결과 데이터프레임 확인 (스키마 출력)
result_df.printSchema()
# result_df.count()

root
 |-- ad_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_passed: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_title: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- excepted_age_array: string (nullable = true)
 |-- collection: string (nullable = true)
 |-- ad_duration: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ad_link: string (nullable = true)
 |-- participant_count: integer (nullable = true)
 |-- aver

### 광고별 텍스트 길이 column 추가

In [None]:
# # 이전 추가 코드, 메모리 사용량 및 코드 최적화 전


# # UDF은 메모리 사용량이 클 수 있다고 해서 Spark 내장 함수로 바꾸었다
# # PySpark에서 사용자 정의 함수(User Defined Function, UDF)를 정의할 때 사용하는 데코레이터
# @F.udf(IntegerType())   # 파라미터는 반환되는 데이터 타입
# def count_text_length(text):
#     if text:
#         # 특수 문자를 제거합니다. text에서 영문자, 숫자, 공백을 제외한 모든 문자를 제거
#         return len(re.sub(r'[^\w\s]', '', text))
#     else:
#         return None

# # 'ad_name'과 'ad_script'를 그룹화하고, 각 그룹의 개수를 카운트
# grouped_ad_texts = result_df.groupBy('ad_name', 'ad_script').count()
# # print(f"grouped_ad_texts: {grouped_ad_texts.count()}")

# # 'ad_name' 별로 가장 빈번한 'ad_script'를 선택
# windowSpec = Window.partitionBy('ad_name').orderBy(F.desc('count'))
# most_frequent_ad_texts = grouped_ad_texts.withColumn("rank", F.rank().over(windowSpec)).filter("rank = 1").drop('count', 'rank')
# # print(f"most_frequent_ad_texts: {most_frequent_ad_texts.count()}")

# # count가 2 이상인 ad_name만 뽑아서 리스트에 넣기
# ad_names_with_multiple_texts_list = [row['ad_name'] for row in most_frequent_ad_texts.filter(F.col('count') >= 2).select('ad_name').collect()]
# # print(f"script 개수가 2 이상인 ad_name 개수: {len(ad_names_with_multiple_texts_list)}")

# # 특수 문자를 제거하고, 그 결과의 길이를 계산
# most_frequent_ad_texts = most_frequent_ad_texts.withColumn("text_length", count_text_length(F.col("ad_script")))
# # # 특수 문자를 제거하고, 그 결과의 길이를 계산
# # most_frequent_ad_texts = most_frequent_ad_texts.withColumn("cleaned_script", F.regexp_replace(F.col("script"), "[^a-zA-Z0-9\\s]", ""))
# # most_frequent_ad_texts = most_frequent_ad_texts.withColumn("text_length", F.length(F.col("cleaned_script")))


# # 원래 데이터프레임에 조인
# result_df = result_df.join(most_frequent_ad_texts.select('ad_name', 'text_length'), ['ad_name'], 'left')

In [54]:
# 'ad_name'과 'ad_script'를 그룹화하고, 각 그룹의 개수를 카운트
grouped_ad_texts = result_df.groupBy('ad_name', 'ad_script').count()

# 'ad_name' 별로 가장 빈번한 'ad_script'를 선택
windowSpec = Window.partitionBy('ad_name').orderBy(F.desc('count'))
most_frequent_ad_texts = grouped_ad_texts.withColumn("rank", F.rank().over(windowSpec)).filter("rank = 1").drop('count', 'rank')

# 특수문자 제거 및 길이 계산
most_frequent_ad_texts = most_frequent_ad_texts.withColumn("cleaned_script", F.regexp_replace(F.col("ad_script"), "[^\\s\\n.,\uAC00-\uD7A3a-zA-Z0-9]", ""))
most_frequent_ad_texts = most_frequent_ad_texts.withColumn("text_length", F.length(F.col("cleaned_script")))

# 원래 데이터프레임에 조인
# most_frequent_ad_texts가 상대적으로 작다고 했으므로 브로드캐스팅을 사용
result_df = result_df.join(broadcast(most_frequent_ad_texts.select('ad_name', 'text_length')), ['ad_name'], 'left')

In [None]:
# # ad_name 별로 그룹화하고, 각 그룹에서 addText의 유니크한 개수를 센다.
# grouped_df = df_attend.groupBy("name").agg(F.countDistinct("addText").alias("unique_addText_count"))

# # unique_addText_count가 2 이상인 ad_name만 필터링한다.
# filtered_df = grouped_df.filter(F.col("unique_addText_count") >= 2)

# # 필터링된 ad_name의 개수를 센다.
# count_of_ad_names_with_multiple_texts = filtered_df.count()

# print(f"addText 개수가 2 이상인 ad_name의 개수는 {count_of_ad_names_with_multiple_texts}개입니다.")

# # addText 개수가 2 이상인 ad_name들을 리스트로 만듭니다.
# ad_names_with_multiple_texts = [row.name for row in filtered_df.select("name").collect()]

# # addText 개수가 2 이상인 ad_name들에 대한 addText 샘플 10개를 출력합니다.
# for name in ad_names_with_multiple_texts[:10]:  # 처음 10개의 ad_name만 사용
#     sample_texts = df_attend.filter(F.col("name") == name).select("addText").distinct().limit(10).collect()
#     print(f"For ad_name = {name}, sample addTexts are:")
#     for row in sample_texts:
#         print("  -", row.addText)

#### 확인

In [56]:
# # script에 값이 있는 행의 개수
# count_script = result_df.filter(F.col("script").isNotNull()).count()

# # text_length에 값이 있는 행의 개수
# count_text_length = result_df.filter(F.col("text_length").isNotNull()).count()

# print(f"script에 값이 있는 행의 개수: {count_script}")
# print(f"text_length에 값이 있는 행의 개수: {count_text_length}")

# 결과 데이터프레임 확인 (스키마 출력)
result_df.printSchema()

root
 |-- ad_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_passed: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_title: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- excepted_age_array: string (nullable = true)
 |-- collection: string (nullable = true)
 |-- ad_duration: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ad_link: string (nullable = true)
 |-- participant_count: integer (nullable = true)
 |-- aver

### 광고 / 유저 별 평균 트라이 횟수

In [None]:
# # 최적화 이전 코드


# # accuracy_array 컬럼의 배열 크기를 계산하여 try_count 컬럼을 생성
# result_df = result_df.withColumn("try_count", size(result_df.accuracy_array))

# # 유저별 평균 트라이 횟수
# user_avg_tries = result_df.groupBy("user_id").agg(avg("try_count").alias("user_avg_tries"))

# # 광고별 평균 트라이 횟수
# ad_avg_tries = result_df.groupBy("ad_name").agg(avg("try_count").alias("ad_avg_tries"))

# # # 결과 출력 (옵션)
# # user_avg_tries.show()
# # ad_avg_tries.show()

# # user_avg_tries를 result_df에 조인
# result_df = result_df.join(user_avg_tries, on="user_id", how="left")

# # ad_avg_tries를 result_df에 조인
# result_df = result_df.join(ad_avg_tries, on="ad_name", how="left")

In [55]:
# 최적화 이후 코드

# accuracy_array 컬럼의 배열 크기를 계산하여 try_count 컬럼을 생성
result_df = result_df.withColumn("try_count", F.size(F.col("accuracy_array")))

# 유저별 평균 트라이 횟수를 계산하는 Window 함수
window_by_user = Window.partitionBy("user_id")
result_df = result_df.withColumn("user_avg_tries", F.avg("try_count").over(window_by_user))

# 광고별 평균 트라이 횟수를 계산하는 Window 함수
window_by_ad = Window.partitionBy("ad_name")
result_df = result_df.withColumn("ad_avg_tries", F.avg("try_count").over(window_by_ad))

#### 확인

In [56]:
# # user_avg_tries 값이 있는 행의 개수
# count_user_avg_tries = result_df.filter(F.col("user_avg_tries").isNotNull()).count()

# # ad_avg_tries 값이 있는 행의 개수
# count_ad_avg_tries = result_df.filter(F.col("ad_avg_tries").isNotNull()).count()

# print(f"user_avg_tries 값이 있는 행의 개수: {count_user_avg_tries}")
# print(f"count_ad_avg_tries 값이 있는 행의 개수: {count_ad_avg_tries}")

result_df.printSchema()

root
 |-- ad_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stt_text_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_passed: string (nullable = true)
 |-- ad_script: string (nullable = true)
 |-- accuracy_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- created_timestamp_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_title: string (nullable = true)
 |-- job: string (nullable = true)
 |-- language: string (nullable = true)
 |-- perfect: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- excepted_age_array: string (nullable = true)
 |-- collection: string (nullable = true)
 |-- ad_duration: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ad_link: string (nullable = true)
 |-- participant_count: integer (nullable = true)
 |-- aver

# result_df를 로컬에 저장

In [57]:
# os.environ['HADOOP_HOME'] = "C:\\Hadoop\\hadoop-3.3.6"
print(os.environ["HADOOP_HOME"])

C:\Hadoop\hadoop-3.3.6


In [None]:
# num_partitions = result_df.rdd.getNumPartitions()
# print(f"Current number of partitions: {num_partitions}")

Current number of partitions: 104

## Dataframe 데이터 타입 변환
pyspark.df -> csv 변환을 하기 위해서,  
CSV 파일은 string값만 허용하기 때문  
그 전에 accuracy_array 컬럼 내의 double 요소들을 int 타입으로 변환

In [None]:
# # ad_name과 user_id에 따라 특정 행을 필터링하고 출력하는 코드
# specific_row_rd = result_df.filter(
#     (F.col("ad_name") == "살어리랏다 살어리랏다 청산에 살어리랏다") &
#     (F.col("user_id") == "4a7gilnpClZ81pLlW1A5SdK5ymr2")
# )

# # 해당 행의 ad_script 컬럼값을 출력
# specific_row_rd.select("accuracy_array").show(truncate=False)

위의 출력이 array인지 아닌지 확인 -> []에 둘러싸인 array값이면 옳게 된 데이터 
ex) 잘못된 데이터  
+-----------------------------------------------------+  
|accuracy_array                                       |  
+-----------------------------------------------------+  
|63,66,69,63,67,69,74,69,77,66,66,65,68,75,73,56,76,72|  
+-----------------------------------------------------+  

In [None]:
# # parquet은 굳이 이런 변환 필요가 없으니 주석 처리함
# # accuracy_array 컬럼의 배열을 대괄호와 쉼표로 구분된 문자열로 변환
# result_df = result_df.withColumn("accuracy_array", F.format_string("[%s]", F.expr("array_join(accuracy_array, ',')")))
# # stt_text_array 컬럼을 문자열로 변환
# result_df = result_df.withColumn("stt_text_array", concat_ws(",", result_df.stt_text_array))
# # created_timestamp_array 컬럼을 문자열로 변환
# result_df = result_df.withColumn("created_timestamp_array", concat_ws(",", result_df.created_timestamp_array))

## 로컬에 저장하는 작업: Parquet 포맷 or CSV 포맷

In [None]:
# # 오늘의 날짜를 얻어 'yymmdd' 포맷으로 변환
# today = datetime.today()
# formatted_date = today.strftime('%y%m%d')

# # 저장할 경로 설정
# file_path = f"D:/DATA_PREPROCESS/FIRESTORE_DATAS/voice_metadata_parquet_{formatted_date}"

# # 파티션 수를 결정합니다.
# num_partitions = 300

# #-------------------------------------------------------------------------------
# # # Row 번호를 생성하여 데이터를 나눕니다.
# # window_spec = Window().orderBy(F.monotonically_increasing_id())
# # result_df = result_df.withColumn("row_num", F.row_number().over(window_spec))

# # # 각 파티션에 대해 데이터를 저장합니다.
# # for i in range(num_partitions):
# #     partition_df = result_df.filter((F.col("row_num") % num_partitions) == i)
# #     partition_file_path = f"{file_path}_partition_{i}.parquet"
# #     partition_df.write.parquet(partition_file_path, mode='overwrite')
# #-------------------------------------------------------------------------------


# # 데이터를 균등하게 분할합니다.
# repartitioned_df = result_df.repartition(num_partitions)

# # 각 파티션에 대해 데이터를 저장합니다.
# for i in range(num_partitions):
#     partition_df = repartitioned_df.filter(F.spark_partition_id() == i)
#     partition_file_path = f"{file_path}_partition_{i}.parquet"
#     partition_df.write.parquet(partition_file_path, mode='overwrite')

테스트할 땐 바로 아래 셀들을 주석 처리(로컬에 저장하는 코드)

In [60]:
# # 로컬에 저장하는 작업 전 메모리에서 다른 df들을 해제
# # 메모리에서 불필요한 DataFrame 해제
# # 메모리에서 불필요한 DataFrame 해제
# for df_name in [
#     'df_ad', 'df_ad_renamed', 'df_attend', 'df_attend_with_rownum',
#     'df_average_by_ad', 'df_average_by_user', 'df_externals', 'df_hdd',
#     'df_memo', 'df_memo_1_2', 'df_memo_backup', 'df_memo_backup_060900',
#     'df_memo_backup_060912', 'df_memo_backup_f1', 'df_memo_backup_f2',
#     'df_memo_backup_f3', 'df_memo_folder', 'df_point', 'df_point_1_2',
#     'df_point_backup', 'df_point_backup_f1', 'df_point_backup_f2',
#     'df_point_backup_f3', 'df_point_dropped', 'df_point_folder', 'df_ssd',
#     'df_user_backup', 'df_users', 'df_users_transformed'
# ]:
#     if df_name in locals():
#         eval(df_name).unpersist(blocking=True)

In [None]:
# # 스파크에서는 persist() 또는 cache() 함수를 통해 데이터셋을 메모리에 보관할 수 있음
# # 이 때 스토리지 레벨을 지정해 데이터의 저장 방식을 조정할 수 있음
# # MEMORY_AND_DISK_SER 레벨은 메모리가 부족하면 디스크에 저장
# # DataFrame을 RDD로 변환
# result_rdd = result_df.rdd

# # 스토리지 레벨 설정
# result_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

# # RDD를 다시 DataFrame으로 변환
# result_df = result_rdd.toDF()

In [None]:
# # Window 함수 사용하여 row_id 생성
# window_spec = Window.orderBy(F.lit('A'))  # 'A'는 임의의 상수. 순차적으로 ID를 부여하기 위한 방법
# result_df = result_df.withColumn("row_id", F.row_number().over(window_spec))

# # 전체 데이터를 몇 개의 파 티션으로 나눌 것인지 지정
# num_parts = 1038

# # randomSplit() 함수를 사용하여 데이터프레임 나누기
# weights = [1.0 / num_parts] * num_parts
# dfs = result_df.randomSplit(weights)

# # 오늘의 날짜를 얻어 'yymmdd' 포맷으로 변환
# today = datetime.today()
# formatted_date = today.strftime('%y%m%d')

# # 저장할 경로 설정
# base_path = f"D:/DATA_PREPROCESS/FIRESTORE_DATAS/voice_metadata_parquet_{formatted_date}"


# # 각 파티션 저장 후 메모리에서 해당 데이터프레임 제거
# for df_part in dfs:
#     # 데이터를 Parquet 형식으로 저장
#     df_part.write.mode('append').parquet(base_path)
#     df_part.unpersist()  # 해당 데이터프레임 메모리에서 제거

In [None]:
# dfs_count = len(dfs)
# print(dfs_count)

# # 첫 번째 dfs의 행 개수 확인
# first_dfs_count = dfs[0].count()
# print(first_dfs_count)

In [None]:
# print(rows_per_executor)
# print(num_parts)

In [58]:
# 오늘의 날짜를 얻어 'yymmdd' 포맷으로 변환
today = datetime.today()
formatted_date = today.strftime('%y%m%d')

# 저장할 경로 설정
file_path = f"D:/DATA_PREPROCESS/FIRESTORE_DATAS/voice_metadata_parquet_{formatted_date}"

#------------------------------------------------------------------------------------------------

# # 기존의 분할 안한 df의 저장 코드
# # 데이터를 Parquet 형식으로 저장, parquet파일 형식은 스키마를 이미 포함하고 있기에 header필요 X
# result_df.repartition(250).write.parquet(file_path, mode='overwrite')

#------------------------------------------------------------------------------------------------

# # dfs리스트로 result_df를 분할했을 때 저장 코드
# for index, df_part in enumerate(dfs):
#     # 데이터를 Parquet 형식으로 저장
#     # 동일한 base_path에 저장되며, 각 파티션은 별도의 파일로 저장됩니다.
#     df_part.write.mode('append').parquet(base_path)

#------------------------------------------------------------------------------------------------

# # 고유한 'ad_duration_end' 값 가져오기
# unique_ad_durations = result_df.select('ad_duration_end').distinct().collect()
# unique_ad_durations = [row.ad_duration_end for row in unique_ad_durations]

# # ad_duration_end를 기준으로 데이터 분할
# dfs = [result_df.filter(F.col('ad_duration_end') == duration) for duration in unique_ad_durations]

# # 오늘의 날짜와 경로 설정
# today = datetime.today()
# formatted_date = today.strftime('%y%m%d')
# base_path = f"D:/DATA_PREPROCESS/FIRESTORE_DATAS/voice_metadata_parquet_{formatted_date}"

# # 각 파티션을 저장
# for df_part in dfs:
#     df_part.write.mode('append').parquet(base_path)
#     df_part.unpersist(blocking=True)  # 메모리 즉시 해제

In [59]:
# 파티션 개수를 300개로 재조정하여 데이터를 나눕니다.
# 이렇게 하면 Spark는 데이터를 150개의 파티션으로 나누고, 각 파티션을 별도의 파일로 저장합니다.
result_df.repartition(400).write.parquet(file_path, mode='overwrite')

 3일째 지긋지긋한 sparkcontext 에러........  

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[104], line 3
      1 # 파티션 개수를 150개로 재조정하여 데이터를 나눕니다.
      2 # 이렇게 하면 Spark는 데이터를 150개의 파티션으로 나누고, 각 파티션을 별도의 파일로 저장합니다.
----> 3 result_df.repartition(150).write.parquet(file_path, mode='overwrite')

File ~\anaconda3\envs\my_conda_01\lib\site-packages\pyspark\sql\readwriter.py:1656, in DataFrameWriter.parquet(self, path, mode, partitionBy, compression)
   1654     self.partitionBy(partitionBy)
   1655 self._set_opts(compression=compression)
-> 1656 self._jwrite.parquet(path)

File ~\anaconda3\envs\my_conda_01\lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~\anaconda3\envs\my_conda_01\lib\site-packages\pyspark\errors\exceptions\captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
    167 def deco(*a: Any, **kw: Any) -> Any:
    168     try:
--> 169         return f(*a, **kw)
    170     except Py4JJavaError as e:
    171         converted = convert_exception(e.java_exception)

File ~\anaconda3\envs\my_conda_01\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o7163.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:833)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:833)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:120)
	at org.apache.spark.SparkContext.submitMapStage(SparkContext.scala:2426)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:181)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:181)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:183)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:266)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:264)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:264)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)

In [None]:
# # Spark가 사용하고 있는 Python 실행 파일 확인
# print(spark.sparkContext._conf.get('spark.pyspark.python'))

# # RDD 생성
# rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# transformed_rdd = rdd.map(lambda x: x * 2)

# # Action: 평균을 계산
# mean_value = transformed_rdd.reduce(lambda a, b: a + b) / transformed_rdd.count()
# print(f"The mean value is: {mean_value}")

# 샘플 추출

## 랜덤 샘플 1만개 추출
- 암기 플러스와 포인트 벌기 5천개씩

In [None]:
# # DataFrame을 RDD로 변환
# rdd_memorization = result_df.filter(F.col("collection") == "Memorization").rdd
# rdd_point = result_df.filter(F.col("collection") == "Point").rdd

# # None 값을 포함하지 않는 행만 필터링
# filtered_rdd_memorization = rdd_memorization.filter(lambda row: all([x is not None for x in row]))
# filtered_rdd_point = rdd_point.filter(lambda row: all([x is not None for x in row]))

# # 정확한 개수의 샘플을 얻음
# sample_memorization = filtered_rdd_memorization.takeSample(False, 5000, seed=42)
# sample_point = filtered_rdd_point.takeSample(False, 5000, seed=42)

# # 샘플을 DataFrame으로 변환
# sample_df_memorization = spark.createDataFrame(sample_memorization)
# sample_df_point = spark.createDataFrame(sample_point)

# # 두 샘플을 합침
# final_sample_df = sample_df_memorization.union(sample_df_point)

위 코드 23분이나 실행시간이 걸림. 시간 단축 필요  
그래서 아래 코드 작성

In [None]:
Hey you! Stop it!

In [None]:
# 'collection'에 따라 데이터를 필터링
filtered_df_memorization = result_df.filter(F.col("collection") == "Memorization").dropna()
filtered_df_point = result_df.filter(F.col("collection") == "Point").dropna()

# 랜덤 샘플 추출 (샘플 크기는 대략적으로 지정)
sample_memorization = filtered_df_memorization.sample(False, 0.001, seed=42).limit(5000)
sample_point = filtered_df_point.sample(False, 0.01, seed=42).limit(5000)

# 두 샘플 합치기
final_sample_df = sample_memorization.union(sample_point)

1. None 값 필터링: DataFrame API를 사용하면 dropna() 메서드로 쉽게 None 값을 제거할 수 있다.  
2. DataFrame API의 sample() 메서드를 사용하면 간단하게 샘플을 추출  

## parquet or CSV 파일로 샘플 df 저장

In [None]:
sample_path = f"D:/DATA_PREPROCESS/FIRESTORE_DATAS/sample_voice_metadata_{formatted_date}"
final_sample_df.coalesce(1).write.parquet(sample_path, mode='overwrite')  # 샘플의 크기가 작으므로 하나의 파티션에

In [None]:
# sample_path = f"D:/DATA_PREPROCESS/FIRESTORE_DATAS/sample_voice_metadata_{formatted_date}"
# final_sample_df.coalesce(1).write.csv(sample_path, mode='overwrite', header=True)  # 샘플의 크기가 작으므로 하나의 파티션에

# Test

In [None]:
# # 특정 user_id, ad_name으로 필터링
# filtered_result_df = result_df.filter(
#     (F.col("user_id") == "EyDwTDHLROSqn1aM6AhgvAWEvTs2") & 
#     (F.col("ad_name") == "5,000원 로봇청소기")
# )
# filtered_result_df.show()

2개 중복됨을 확인  
+------------------+--------------------+------+---------+--------------------+---------------------------------+--------+----------+----------+-------+------+----+--------+-------+------------------+----------+-------------------+---------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+--------------------+---------------------------------+--------------+---------------------------------+-----------------------+----------------------+------------------------+-----------------+-----------------+-----------+---------+------------------+------------------+
|           ad_name|             user_id|attend|is_passed|            imageUrl|                        ad_script|ad_title|birth_year|local_code|is_test|gender| job|language|perfect|excepted_age_array|collection|        ad_duration|ad_duration_end|   desc_images_array|        images_array|     thumbnail_image| level|             ad_link|participant_count|            videoUrl|                           script|accuracy_array|                   stt_text_array|created_timestamp_array|average_accuracy_by_ad|average_accuracy_by_user|pass_rate_by_user|  pass_rate_by_ad|text_length|try_count|    user_avg_tries|      ad_avg_tries|
+------------------+--------------------+------+---------+--------------------+---------------------------------+--------+----------+----------+-------+------+----+--------+-------+------------------+----------+-------------------+---------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+--------------------+---------------------------------+--------------+---------------------------------+-----------------------+----------------------+------------------------+-----------------+-----------------+-----------+---------+------------------+------------------+
|5,000원 로봇청소기|EyDwTDHLROSqn1aM6...|     1|     pass|https://firebases...|품절 대란의 다이소 로봇 청소기...|  다이소|        57|        Gy|      0|     W|기타|대한민국|  160.0|                []|     Point|22.09.29 - 22.10.13|       22.10.14|['https://firebas...|['https://firebas...|https://firebases...|Normal|https://shop.dais...|              526|https://firebases...|품절 대란의 다이소 로봇 청소기...|            95|품절대란의 다이소 청소기 로봇 ...|22.09.29 오후 14시 35분|     91.78747628083491|                    79.8|97.78142974527528|99.01185770750988|        384|        1|1.3311421528348397|1.0553359683794465|
|5,000원 로봇청소기|EyDwTDHLROSqn1aM6...|     1|     pass|https://firebases...|품절 대란의 다이소 로봇 청소기...|  다이소|        57|        Gy|      0|     W|기타|대한민국|  160.0|                []|     Point|22.09.29 - 22.10.13|       22.10.14|['https://firebas...|['https://firebas...|https://firebases...|Normal|https://shop.dais...|              526|https://firebases...|품절 대란의 다이소 로봇 청소기...|            95|품절대란의 다이소 청소기 로봇 ...|22.09.29 오후 14시 35분|     91.78747628083491|                    79.8|97.78142974527528|99.01185770750988|        384|        1|1.3311421528348397|1.0553359683794465|
+------------------+--------------------+------+---------+--------------------+---------------------------------+--------+----------+----------+-------+------+----+--------+-------+------------------+----------+-------------------+---------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+--------------------+---------------------------------+--------------+---------------------------------+-----------------------+----------------------+------------------------+-----------------+-----------------+-----------+---------+------------------+------------------+


In [None]:
# # user_id와 ad_name을 기준으로 그룹화하고, 각 그룹의 행 수를 카운트합니다.
# grouped_df = result_df.groupBy("user_id", "ad_name").agg(F.count("*").alias("count"))

# # 카운트가 1보다 큰 그룹을 필터링하여 중복된 행을 찾습니다.
# duplicated_df = grouped_df.filter(grouped_df["count"] > 1)

# # 중복된 행의 수를 계산합니다.
# num_duplicated_rows = duplicated_df.count()

# # 중복된 행이 있는 경우만 별도로 저장합니다.
# if num_duplicated_rows > 0:
#     duplicated_df.write.csv("path/to/save/duplicated_rows.csv")

# # 결과를 출력합니다.
# num_duplicated_rows, duplicated_df.show()

왜 Java heap error가 발생했는 지 파악

## 샘플 데이터 테스트 및 수정(ad-hoc작업)

In [None]:
# # Group by 'user_id' and 'ad_name', and count the number of occurrences of each group
# grouped_df = final_sample_df.groupBy('user_id', 'ad_name').agg(F.count('*').alias('count'))

# # Filter out the groups that appear more than once, these are the duplicates
# duplicated_rows_df = grouped_df.filter(F.col('count') > 1)

# # Show the duplicated rows
# duplicated_rows_df.show()
# duplicated_rows_df.unpersist(blocking=True)

In [None]:
# # 'user_id'와 'ad_name'이 중복되는 행의 'participant_count' 최솟값을 찾음
# min_participant_count_df = duplicated_rows_df.join(
#     final_sample_df, ['user_id', 'ad_name']
# ).groupBy('user_id', 'ad_name').agg(
#     F.min('participant_count').alias('min_participant_count')
# )

# # 최소 'participant_count' 값을 가진 행과 조인하여 필터링
# final_sample_df_with_min_count = final_sample_df.join(
#     min_participant_count_df, ['user_id', 'ad_name'], 'left_outer'
# ).filter(
#     (F.col('participant_count') == F.col('min_participant_count')) |
#     F.col('min_participant_count').isNull()
# )

# # 'script' 컬럼 삭제
# final_sample_df = final_sample_df_with_min_count.drop('script')

In [None]:
# final_sample_df = final_sample_df.drop("script")
# final_sample_df.printSchema()

# 추가 수정 및 DB에 제안 할 내용
## 추가 수정할 내용
## DB에 제안
- collection의 문자열 비교에 필요 이상의 자원과 시간이 낭비된다
    - Point는 1, Memorization은 2, 등으로 타입과 크기를 축소시켜 데이터 연산 부하를 줄여달라
    - 