In [1]:
import tensorflow as tf
import s3fs
import time
import multiprocessing

# AWS S3 액세스 키 및 시크릿 액세스 키 설정
access_key = 'access_key'
secret_key = 'secret_key'

# S3 파일 시스템 초기화
fs = s3fs.S3FileSystem(key=access_key, secret=secret_key)

# S3 버킷 이름 및 prefix 설정
source_bucket_name = 'tfrecord-bucket01'
prefixes = ['DC0101', 'DC0102', 'DC0103', 'DC0104', 'DC0105', 'DC0106', 'DC0107', 'DC0108', 'DC0109', 'DC0110']

# 병합된 TFRecord 파일 경로
output_file = 'training.tfrecord'

# 현재까지 병합된 파일 개수 설정
merged_files = multiprocessing.Value('i', 0)

# 병합된 파일 개수를 업데이트하고 출력하는 함수
def update_merged_files(count):
    with merged_files.get_lock():
        merged_files.value += count
        print(f"병합된 파일 개수: {merged_files.value}")

# 전체 시간과 남은 시간을 출력하는 함수
def print_remaining_time(start_time, count, total_files):
    elapsed_time = time.time() - start_time
    remaining_files = total_files - merged_files.value
    if remaining_files > 0:
        remaining_time = elapsed_time / count * remaining_files
        minutes = int(remaining_time / 60)
        seconds = int(remaining_time % 60)
        print(f"예상 남은 시간: {minutes}분 {seconds}초")
    else:
        print("모든 파일이 병합되었습니다.")

# S3 파일 시스템 연결 상태 확인
print("S3 파일 시스템의 연결 상태를 확인하고 있습니다.")
try:
    fs.ls(source_bucket_name)  # 연결 확인용 작업
    print("S3 파일 시스템에 연결되었습니다.")
except Exception as e:
    print("S3 파일 시스템 연결에 실패했습니다.")
    print(e)

# 병합할 파일 목록 가져오기
print("파일을 가져오기 시작하였습니다.")
file_list = []
try:
    for prefix in prefixes:
        file_list += fs.glob(f"{source_bucket_name}/{prefix}*")
except Exception as e:
    print("파일 목록을 가져오는데 실패했습니다.")
    print(e)
    exit()

total_files = len(file_list)
print("파일을 가져오는 것이 마무리되었습니다.")
print(total_files)

# TFRecord 파일 병합 함수
def merge_tfrecord_files(file_list, output_file):
    # TFRecordDataset을 사용하여 파일 목록에서 데이터셋 생성
    dataset = tf.data.TFRecordDataset(file_list)

    # 데이터셋을 하나의 파일로 병합하여 저장
    writer = tf.data.experimental.TFRecordWriter(output_file)
    writer.write(dataset)

    # 병합된 파일 개수 업데이트
    update_merged_files(len(file_list))

# 전체 작업을 여러 작업으로 분할하여 병렬 처리
num_threads = multiprocessing.cpu_count()  # 사용 가능한 CPU 코어 수
file_chunks = [file_list[i:i+num_threads] for i in range(0, len(file_list), num_threads)]

# 병렬 작업 시작
start_time = time.time()
processes = []
for file_chunk in file_chunks:
    p = multiprocessing.Process(target=merge_tfrecord_files, args=(file_chunk, output_file))
    processes.append(p)
    p.start()

# 모든 작업 완료 대기
for p in processes:
    p.join()

# 전체 작업 시간 및 남은 시간 출력
print_remaining_time(start_time, total_files, total_files)

# 업로드할 버킷 경로
destination_bucket_name = 'tfrecord-bucket02'
destination_path = f"s3://{destination_bucket_name}/{output_file}"

# 병합된 파일을 다른 버킷으로 업로드
try:
    fs.put(output_file, destination_path)
    print(f"TFRecord 파일이 {destination_bucket_name} 버킷으로 업로드되었습니다.")
except Exception as e:
    print("파일 업로드에 실패했습니다.")
    print(e)


S3 파일 시스템의 연결 상태를 확인하고 있습니다.
S3 파일 시스템에 연결되었습니다.
파일을 가져오기 시작하였습니다.
파일을 가져오는 것이 마무리되었습니다.
211934
예상 남은 시간: 21분 26초
TFRecord 파일이 tfrecord-bucket02 버킷으로 업로드되었습니다.


In [7]:
import tensorflow as tf
import s3fs
import time
import multiprocessing

access_key = 'access_key'
secret_key = 'secret_key'

fs = s3fs.S3FileSystem(key=access_key, secret=secret_key)

source_bucket_name = 'tfrecord-bucket01'
prefixes = ['DC0201', 'DC0202', 'DC0203']

output_file = 'validation.tfrecord'

merged_files = multiprocessing.Value('i', 0)

def update_merged_files(count):
    with merged_files.get_lock():
        merged_files.value += count
        print(f"병합된 파일 개수: {merged_files.value}")

def print_remaining_time(start_time, count, total_files):
    elapsed_time = time.time() - start_time
    remaining_files = total_files - merged_files.value
    if remaining_files > 0:
        remaining_time = elapsed_time / count * remaining_files
        minutes = int(remaining_time / 60)
        seconds = int(remaining_time % 60)
        print(f"예상 남은 시간: {minutes}분 {seconds}초")
    else:
        print("모든 파일이 병합되었습니다.")

print("S3 파일 시스템의 연결 상태를 확인하고 있습니다.")
try:
    fs.ls(source_bucket_name)
    print("S3 파일 시스템에 연결되었습니다.")
except Exception as e:
    print("S3 파일 시스템 연결에 실패했습니다.")
    print(e)

print("파일을 가져오기 시작하였습니다.")
file_list = []
try:
    for prefix in prefixes:
        file_list += fs.glob(f"{source_bucket_name}/{prefix}*")
except Exception as e:
    print("파일 목록을 가져오는데 실패했습니다.")
    print(e)
    exit()

total_files = len(file_list)
print("파일을 가져오는 것이 마무리되었습니다.")
print(total_files)

def merge_tfrecord_files(file_list, output_file):
    dataset = tf.data.TFRecordDataset(file_list)
    writer = tf.io.TFRecordWriter(output_file)
    for data in dataset:
        writer.write(data.numpy())
    update_merged_files(len(file_list))

num_threads = multiprocessing.cpu_count()
file_chunks = [file_list[i:i+num_threads] for i in range(0, len(file_list), num_threads)]

start_time = time.time()
processes = []
for file_chunk in file_chunks:
    p = multiprocessing.Process(target=merge_tfrecord_files, args=(file_chunk, output_file))
    processes.append(p)
    p.start()

for p in processes:
    p.join()

print_remaining_time(start_time, total_files, total_files)

local_file_path = f"./{output_file}"

# 로컬 파일을 S3에 업로드
try:
    fs.put(local_file_path, destination_path)
    print(f"TFRecord 파일이 {destination_bucket_name} 버킷으로 업로드되었습니다.")
except Exception as e:
    print("파일 업로드에 실패했습니다.")
    print(e)


S3 파일 시스템의 연결 상태를 확인하고 있습니다.
S3 파일 시스템에 연결되었습니다.
파일을 가져오기 시작하였습니다.
파일을 가져오는 것이 마무리되었습니다.
58066
예상 남은 시간: 4분 15초
TFRecord 파일이 tfrecord-bucket02 버킷으로 업로드되었습니다.
