# S3 폴더 -> 카탈로그 테이블 생성

In [1]:
import sys
print(sys.path)
print(sys.executable)

['/Users/mac/insight_/aws_etl', '/opt/anaconda3/lib/python312.zip', '/opt/anaconda3/lib/python3.12', '/opt/anaconda3/lib/python3.12/lib-dynload', '', '/opt/anaconda3/lib/python3.12/site-packages', '/opt/anaconda3/lib/python3.12/site-packages/aeosa', '/opt/anaconda3/lib/python3.12/site-packages/setuptools/_vendor']
/opt/anaconda3/bin/python


In [None]:
import boto3
import re

# S3 클라이언트 생성
s3 = boto3.client("s3")

bucket_name = "schoolfriends-bym"

# 버킷 내 모든 객체 가져오기
response = s3.list_objects_v2(Bucket=bucket_name, Delimiter="/")
print(response['CommonPrefixes'])
for folder in response.get('CommonPrefixes', []):
    print(folder['Prefix'])

[{'Prefix': 'at_basic_inquiry_val/'}, {'Prefix': 'at_formula_code/'}, {'Prefix': 'at_low_scale/'}, {'Prefix': 'at_question/'}, {'Prefix': 'at_question_choice/'}, {'Prefix': 'at_question_extra/'}, {'Prefix': 'at_scale_code/'}, {'Prefix': 'at_sub_psy_item/'}, {'Prefix': 'at_user_formula_score/'}, {'Prefix': 'at_user_scale_score/'}, {'Prefix': 'at_user_testing_paper/'}, {'Prefix': 'at_user_testing_paper_pn/'}, {'Prefix': 'at_user_testing_paper_sct/'}, {'Prefix': 'at_user_testing_paper_tr/'}, {'Prefix': 'athena/'}, {'Prefix': 'jar/'}, {'Prefix': 'member/'}, {'Prefix': 'psy_class/'}, {'Prefix': 'psy_estimate/'}, {'Prefix': 'psy_ord/'}, {'Prefix': 'psy_ord_class/'}, {'Prefix': 'psy_target/'}, {'Prefix': 'result/'}, {'Prefix': 'school_class/'}, {'Prefix': 'school_info/'}, {'Prefix': 'school_student/'}, {'Prefix': 'school_teacher/'}]
at_basic_inquiry_val/
at_formula_code/
at_low_scale/
at_question/
at_question_choice/
at_question_extra/
at_scale_code/
at_sub_psy_item/
at_user_formula_score/
at

In [None]:
# CSV 파일만 폴더 만들어서 이동 및 삭제 처리
for obj in response.get("Contents", []): # 'Contents' 키값 가지고 있는 값 (무슨 형태인지를 모르겠네)
    key = obj["Key"]
    if "/" not in key and key.lower().endswith(".csv"):
        print(f"Processing file: {key}")
        match = re.search(r"([A-Z]+)_([A-Z]+)", key) # 객체 중 알파벳_알파벳 패턴 찾기
        print(match)
        folder_name = match.group(0).lower() # 패턴을 폴더 이름으로 정의하고
        print(folder_name)
        new_key = f"{folder_name}/{key}" # 새로 정의한 폴더 이름/기존 파일 이름
        print(f"Moving {key} to {new_key}")

        # S3 내에서 파일 복사
        s3.copy_object(
            Bucket=bucket_name,
            CopySource={"Bucket": bucket_name, "Key": key},
            Key=new_key
        )

        # 원본 삭제
        s3.delete_object(Bucket=bucket_name, Key=key)
    else:
        pass

Processing file: PSY_CLASS_202508291509.csv
<re.Match object; span=(0, 9), match='PSY_CLASS'>
psy_class
Moving PSY_CLASS_202508291509.csv to psy_class/PSY_CLASS_202508291509.csv
Processing file: PSY_ESTIMATE_202508291511.csv
<re.Match object; span=(0, 12), match='PSY_ESTIMATE'>
psy_estimate
Moving PSY_ESTIMATE_202508291511.csv to psy_estimate/PSY_ESTIMATE_202508291511.csv
Processing file: PSY_ORD_202508291511.csv
<re.Match object; span=(0, 7), match='PSY_ORD'>
psy_ord
Moving PSY_ORD_202508291511.csv to psy_ord/PSY_ORD_202508291511.csv
Processing file: PSY_ORD_CLASS_202508291511.csv
<re.Match object; span=(0, 7), match='PSY_ORD'>
psy_ord
Moving PSY_ORD_CLASS_202508291511.csv to psy_ord/PSY_ORD_CLASS_202508291511.csv
Processing file: PSY_TARGET_DETAIL_202508291511.csv
<re.Match object; span=(0, 10), match='PSY_TARGET'>
psy_target
Moving PSY_TARGET_DETAIL_202508291511.csv to psy_target/PSY_TARGET_DETAIL_202508291511.csv
Processing file: SCHOOL_INFO_202508291512.csv
<re.Match object; span=

# S3 폴더별 크롤러 만들때

In [None]:
# 폴더 이름 리스트화
s3 = boto3.client("s3")
bucket_name = "schoolfriends-bym"
response = s3.list_objects_v2(Bucket=bucket_name, Delimiter="/")
folder_names = []
for folder in response.get("CommonPrefixes", []):
    folder_name = folder["Prefix"].split("/")[0]
    folder_names.append(folder_name) # 버킷안 폴더이름 추출

print('s3에 있는 폴더명 :', folder_names)

# s3 폴더 별로 크롤러 만들어서 아테네 테이블 만들때
glue = boto3.client("glue", region_name="ap-northeast-2")

# 공통 파라미터
role_name = 'arn:aws:iam::932744610695:role/service-role/AWSGlueServiceRole-Schoolfriends' # Glue 서비스 역할 ARN
database_name = 'schoolfriends' # 크롤링해 만들 데이터베이스 이름
bucket_name = 'schoolfriends-bym' # 크롤러할 폴더가 있는 버킷 이름

for folder in folder_names:
    crawler_name = f"crawler_{folder}"
    s3_target_path = f"s3://{bucket_name}/{folder}/"
    
    try:
        response = glue.create_crawler(
            Name=crawler_name,
            Role=role_name,
            DatabaseName=database_name,
            Targets={'S3Targets': [{'Path': s3_target_path}]},
            SchemaChangePolicy={
                'UpdateBehavior': 'LOG', # 스키마가 바뀐경우 log=로그만 남기고, 카탈로그 정의는 그대로 둠/'UPDATE_IN_DATABASE'=카탈로그 정의를 바꿔줌
                'DeleteBehavior': 'LOG' # 삭제된 데이터에 대해 log=로그만 남기고, 카탈로그 정의는 그대로 둠/'DEPRECATE_IN_DATABASE'=카탈로그 정의에서 삭제된 파티션을 제거
            } # 크롤러는 전체 데이터를 스캔할 수도, 새롭게 추가된 데이터만 스캔할 수도 있음 / 그러나 기본적으로 S3의 전체 경로를 스캔함 -> 그렇기 때문에 파티션 삭제만 인지하고, 파일삭제는 인지하지 못함.
        )
        print(f"Crawler '{crawler_name}' created successfully.")
    except glue.exceptions.AlreadyExistsException:
        print(f"Crawler '{crawler_name}' already exists.")
    except Exception as e:
        print(f"Error creating crawler '{crawler_name}': {e}")

s3에 있는 폴더명 : ['at_basic_inquiry_val', 'at_formula_code', 'at_low_scale', 'at_question', 'at_question_choice', 'at_question_extra', 'at_scale_code', 'at_sub_psy_item', 'at_user_formula_score', 'at_user_scale_score', 'at_user_testing_paper', 'at_user_testing_paper_pn', 'at_user_testing_paper_sct', 'at_user_testing_paper_tr', 'athena', 'member', 'psy_class', 'psy_estimate', 'psy_ord', 'psy_ord_class', 'psy_target', 'school_class', 'school_info', 'school_student', 'school_teacher']
Crawler 'crawler_at_basic_inquiry_val' already exists.
Crawler 'crawler_at_formula_code' already exists.
Crawler 'crawler_at_low_scale' already exists.
Crawler 'crawler_at_question' already exists.
Crawler 'crawler_at_question_choice' already exists.
Crawler 'crawler_at_question_extra' already exists.
Crawler 'crawler_at_scale_code' already exists.
Crawler 'crawler_at_sub_psy_item' already exists.
Crawler 'crawler_at_user_formula_score' already exists.
Crawler 'crawler_at_user_scale_score' already exists.
Crawle

# 크롤러 속성 변경하고 싶을 때

In [25]:
# 각 크롤러의 속성을 변경하고 싶은 경우
# - 이미 생성된 Glue의 크롤러의 속성을 변경하고 싶은 경우
# - 예를 들어 스키마 구조 변경 정책(SchemaChangePolicy) 등 일괄 변경
import boto3

glue = boto3.client("glue", region_name="ap-northeast-2")

# 크롤러 이름 리스트 가져오기
response = glue.list_crawlers()
crawler_names = response.get("CrawlerNames", [])
print(crawler_names)

['amazon_reviews_partitioned_crawler', 'crawler_at_basic_inquiry_val', 'crawler_at_formula_code', 'crawler_at_low_scale', 'crawler_at_question', 'crawler_at_question_choice', 'crawler_at_question_extra', 'crawler_at_scale_code', 'crawler_at_sub_psy_item', 'crawler_at_user_formula_score', 'crawler_at_user_scale_score', 'crawler_at_user_testing_paper', 'crawler_at_user_testing_paper_pn', 'crawler_at_user_testing_paper_sct', 'crawler_at_user_testing_paper_tr', 'crawler_member', 'crawler_psy_class', 'crawler_psy_estimate', 'crawler_psy_ord', 'crawler_psy_target', 'crawler_school_class', 'crawler_school_info', 'crawler_school_student', 'crawler_school_teacher', 'schoolfriends_member_crawler']


In [35]:
import time
for crawler in crawler_names:
    if crawler.startswith("crawler_"):
        try:
            glue.update_crawler(
                Name=crawler,
                SchemaChangePolicy={
                    "UpdateBehavior": "LOG",
                    "DeleteBehavior": "LOG"
                }
            )
            print(f"✅ {crawler} 의 SchemaChangePolicy 업데이트 완료")
            time.sleep(1)
        except Exception as e:
            print(f"❌ {crawler} 업데이트 실패: {repr(e)}")

✅ crawler_at_basic_inquiry_val 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_formula_code 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_low_scale 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_question 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_question_choice 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_question_extra 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_scale_code 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_sub_psy_item 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_user_formula_score 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_user_scale_score 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_user_testing_paper 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_user_testing_paper_pn 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_user_testing_paper_sct 의 SchemaChangePolicy 업데이트 완료
✅ crawler_at_user_testing_paper_tr 의 SchemaChangePolicy 업데이트 완료
✅ crawler_member 의 SchemaChangePolicy 업데이트 완료
✅ crawler_school_class 의 SchemaChangePolicy 업데이트 완료


# 데이터 카탈로그 스키마를 변경하고 싶을 때

In [3]:
# 크롤링 후 테이블 데이터 타입 재정의
# 강제 string
import boto3

region_name = "ap-northeast-2"
database_name = 'schoolfriends'

# 클라이언트 생성
glue_client = boto3.client("glue", region_name=region_name)
table_list = []
table_paginator = glue_client.get_paginator('get_tables') # get_paginator은 api은 최대 100개 반환하기 때문에 이를 방지하기 위해 페이지네이션 처리
for page in table_paginator.paginate(DatabaseName=database_name): # 데이터베이스 이름이 database_name인 페이지의 순환 설정
    print(page['TableList']) # 각 페이지의 테이블 목록 출력
    for table in page['TableList']: # 각 테이블에 대한 순환 설정
        print(table['Name'])
        table_list.append(table['Name']) # 테이블 이름을 리스트에 추가

for table_name in table_list: # 모든 테이블 이름 반복
    print("----------순환 중인 테이블----------")
    print('테이블 이름', table_name) # 순환 중인 테이블 이름
    table_info = glue_client.get_table(DatabaseName=database_name, Name=table_name)["Table"] # 순환중인 테이블 정보 가져오기
    table_col_info = table_info['StorageDescriptor'] # 테이블의 StorageDescriptor 정보 가져오기 ex {'Columns': [{'Name': 'user_testing_no', 'Type': 'string'}, {'Name': 'psy_item_id', 'Type': 'string'},
    print("----------테이블 정보----------")
    print(table_col_info)
    for col in table_col_info['Columns']:
        print("------------변경되는 컬럼의 타입------------")
        print(col['Name'], col['Type'])
        col['Type'] = 'string'  # 모든 컬럼의 타입을 string으로 변경
    print("-----------------------------------------------------")
    print('Name :', table_info["Name"])
    print('Description :', table_info.get("Description"))
    print('Owner :', table_info.get("Owner"))
    print('Parameters :', table_info.get("Parameters", {}))
    print('TableType :', table_info.get("TableType", "EXTERNAL_TABLE"))
    print('PartitionKeys :', table_info.get("PartitionKeys", []))
    print('StorageDescriptor :', table_info)
    print("-----------------------------------------------------")

    # 테이블 정보 업데이트
    glue_client.update_table(
        DatabaseName=database_name,
        TableInput={
            'Name': table_info["Name"], # 테이블 이름
            'Description': table_info.get("Description", ""), # 테이블 설명
            'Owner': table_info.get("Owner"), # 테이블 소유자
            'Parameters': table_info.get("Parameters", {}), # 테이블 파라미터
            'TableType': table_info.get("TableType", "EXTERNAL_TABLE"), # 테이블 타입
            'PartitionKeys': table_info.get("PartitionKeys", []), # 파티션 키
            'StorageDescriptor': table_col_info
        }
    )
    print(f"✅ {table_name} 의 모든 컬럼 타입이 string으로 변경 완료")

[{'Name': 'at_basic_inquiry_val', 'DatabaseName': 'schoolfriends', 'Description': '', 'Owner': 'owner', 'CreateTime': datetime.datetime(2025, 9, 1, 14, 49, 45, tzinfo=tzlocal()), 'UpdateTime': datetime.datetime(2025, 9, 3, 19, 6, 42, tzinfo=tzlocal()), 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'user_testing_no', 'Type': 'string'}, {'Name': 'psy_item_id', 'Type': 'string'}, {'Name': 'psy_item_ver', 'Type': 'string'}, {'Name': 'sub_psy_item_id', 'Type': 'string'}, {'Name': 'age_ext_sub_psy_item_id', 'Type': 'string'}, {'Name': 'kabc_sub_psy_item_id', 'Type': 'string'}, {'Name': 'testing_org', 'Type': 'string'}, {'Name': 'birthday', 'Type': 'string'}, {'Name': 'sex', 'Type': 'string'}, {'Name': 'at_region_cd', 'Type': 'string'}, {'Name': 'testing_date', 'Type': 'string'}, {'Name': 'name', 'Type': 'string'}, {'Name': 'at_age_cd', 'Type': 'string'}, {'Name': 'at_extension_age_cd', 'Type': 'string'}, {'Name': 'at_school_type_cd', 'Type': 'string'}, {'Name': 'at_school_age_cd

# 데이터 카탈로그 컬럼명 변경하고 싶을 때

In [5]:
# 크롤링이 잘못되어 컬럼명을 다시 정의해야하는 경우
import boto3

region_name = "ap-northeast-2"
database_name = 'schoolfriends'

# 클라이언트 생성 후 테이블 확인
glue_client = boto3.client("glue", region_name=region_name)
table_list = []
table_paginator = glue_client.get_paginator('get_tables') # get_paginator은 api은 최대 100개 반환하기 때문에 이를 방지하기 위해 페이지네이션 처리
for page in table_paginator.paginate(DatabaseName=database_name): # 데이터베이스 이름이 database_name인 페이지의 순환 설정
    print(page['TableList']) # 각 페이지의 테이블 목록 출력
    for table in page['TableList']: # 각 테이블에 대한 순환 설정
        print(table['Name'])
        table_list.append(table['Name']) # 테이블 이름을 리스트에 추가
        print(table_list) # 무슨 테이블이 존재하는지 확인

# 특정 테이블 선택해서 컬럼명 수정
target_table_name = 'school_teacher'
table_info = glue_client.get_table(DatabaseName=database_name, Name=target_table_name)["Table"] # 선택한 테이블 정보 가져오기
table_col_info = table_info['StorageDescriptor'] # 테이블의 StorageDescriptor 정보 가져오기 ex {'Columns': [{'Name': 'user_testing_no', 'Type': 'string'}, {'Name': 'psy_item_id', 'Type': 'string'},
modify_column_name_mapping = ['school_code', 'member_no', 'use_yn', 'memo', 'reg_member_no', 'reg_date', 'mod_member_no', 'mod_date']
for column_name, col_info in zip(modify_column_name_mapping, table_col_info['Columns']):
    print("- 변경되는 컬럼의 정보 ------------")
    print(col_info['Name'], col_info['Type'])
    col_info['Name'] = column_name  # 모든 컬럼의 이름을 변경
    print(f'컬럼명: {col_info["Name"]}으로 변경 완료')
print(table_info['StorageDescriptor'])

[{'Name': 'at_basic_inquiry_val', 'DatabaseName': 'schoolfriends', 'Description': '', 'Owner': 'owner', 'CreateTime': datetime.datetime(2025, 9, 1, 14, 49, 45, tzinfo=tzlocal()), 'UpdateTime': datetime.datetime(2025, 9, 4, 10, 2, 5, tzinfo=tzlocal()), 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'user_testing_no', 'Type': 'string'}, {'Name': 'psy_item_id', 'Type': 'string'}, {'Name': 'psy_item_ver', 'Type': 'string'}, {'Name': 'sub_psy_item_id', 'Type': 'string'}, {'Name': 'age_ext_sub_psy_item_id', 'Type': 'string'}, {'Name': 'kabc_sub_psy_item_id', 'Type': 'string'}, {'Name': 'testing_org', 'Type': 'string'}, {'Name': 'birthday', 'Type': 'string'}, {'Name': 'sex', 'Type': 'string'}, {'Name': 'at_region_cd', 'Type': 'string'}, {'Name': 'testing_date', 'Type': 'string'}, {'Name': 'name', 'Type': 'string'}, {'Name': 'at_age_cd', 'Type': 'string'}, {'Name': 'at_extension_age_cd', 'Type': 'string'}, {'Name': 'at_school_type_cd', 'Type': 'string'}, {'Name': 'at_school_age_cd'

In [6]:
table_col_info

{'Columns': [{'Name': 'school_code', 'Type': 'string'},
  {'Name': 'member_no', 'Type': 'string'},
  {'Name': 'use_yn', 'Type': 'string'},
  {'Name': 'memo', 'Type': 'string'},
  {'Name': 'reg_member_no', 'Type': 'string'},
  {'Name': 'reg_date', 'Type': 'string'},
  {'Name': 'mod_member_no', 'Type': 'string'},
  {'Name': 'mod_date', 'Type': 'string'}],
 'Location': 's3://schoolfriends-bym/school_teacher/',
 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
 'Compressed': False,
 'NumberOfBuckets': -1,
 'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
  'Parameters': {'field.delim': ','}},
 'BucketColumns': [],
 'SortColumns': [],
 'Parameters': {'sizeKey': '5321',
  'objectCount': '1',
  'UPDATED_BY_CRAWLER': 'crawler_school_teacher',
  'recordCount': '61',
  'CrawlerSchemaSerializerVersion': '1.0',
  'averageRecordSize': '87',
  'compressionType': 'none',

In [None]:
# 테이블 정보 업데이트
glue_client.update_table(
    DatabaseName=database_name,
    TableInput={
        'Name': target_table_name, # 수정해야 하는 테이블 이름
        'Description': table_info.get("Description", ""), # 테이블 설명
        'Owner': table_info.get("Owner"), # 테이블 소유자
        'Parameters': table_info.get("Parameters", {}), # 테이블 파라미터
        'TableType': table_info.get("TableType", "EXTERNAL_TABLE"), # 테이블 타입
        'PartitionKeys': table_info.get("PartitionKeys", []), # 파티션 키
        'StorageDescriptor': table_info['StorageDescriptor'] # 여기서도 참조 원리가 반영되어 table_col_info이 아닌 table_info['StorageDescriptor']로 해야함
    }
)
print(f"✅ {target_table_name} 의 모든 컬럼 이름 변경 완료")

✅ school_teacher 의 모든 컬럼 이름 변경 완료


# glue job 만들어야 할때

## 1. rds에 glue data catalog 테이블과 동일한 테이블 생성 과정

In [2]:
# data catalog 테이블명 읽어서 rds에 동일한 테이블명으로 테이블 생성
import boto3

# AWS 리전과 데이터베이스 이름 지정
region_name = "ap-northeast-2"
database_name = "schoolfriends"

# Glue 클라이언트 생성
glue_client = boto3.client("glue", region_name=region_name)

# Glue Data Catalog에서 테이블명 가져오기
table_list = []
paginator = glue_client.get_paginator("get_tables")

for page in paginator.paginate(DatabaseName=database_name):
    for table in page["TableList"]:
        table_name = table["Name"]
        table_list.append(table_name)

print("📌 Glue Data Catalog에 등록된 테이블 목록:")
for t in table_list:
    print("-", t)

📌 Glue Data Catalog에 등록된 테이블 목록:
- aggression_norms
- aggression_point
- at_basic_inquiry_val
- at_formula_code
- at_low_scale
- at_question
- at_question_choice
- at_question_extra
- at_scale_code
- at_sub_psy_item
- at_user_formula_score
- at_user_scale_score
- at_user_testing_paper
- at_user_testing_paper_pn
- at_user_testing_paper_sct
- at_user_testing_paper_tr
- member
- prosociality_norms
- prosociality_point
- psy_class
- psy_estimate
- psy_ord
- psy_ord_class
- psy_target
- school_class
- school_info
- school_student
- school_teacher


In [None]:
# RDS에 Glue Data Catalog 테이블과 동일한 테이블 생성
import pymysql

# RDS 연결 설정
conn = pymysql.connect(
    host="db-rnd.czaugqusch3a.ap-northeast-2.rds.amazonaws.com",
    user="db_rnd_admin",
    password="0dlstk12#",
    database="schoolfriends",
    charset="utf8mb4"
)

cursor = conn.cursor()

def glue_type_to_mysql(glue_type: str) -> str: # Glue Data Catalog의 타입은 Hive/Parquet 계열이라 MySQL 타입으로 바꿔줘야 합니다.
    mapping = {
        "string": "VARCHAR(255)",
        "bigint": "BIGINT",
        "int": "INT",
        "double": "DOUBLE",
        "float": "FLOAT",
        "boolean": "BOOLEAN",
        "timestamp": "DATETIME"
    }
    return mapping.get(glue_type.lower(), "VARCHAR(255)")

for table_name in table_list:
    # Glue 테이블 스키마 가져오기
    table_info = glue_client.get_table(DatabaseName=database_name, Name=table_name)
    columns = table_info["Table"]["StorageDescriptor"]["Columns"]
    partition_keys = table_info["Table"].get("PartitionKeys", [])
    
    # 컬럼 SQL 정의
    column_defs = []
    for col in columns + partition_keys:
        col_name = col["Name"]
        col_type = glue_type_to_mysql(col["Type"])
        column_defs.append(f"`{col_name}` {col_type}")
    
    # CREATE TABLE 쿼리
    create_sql = f"""
    CREATE TABLE IF NOT EXISTS `{table_name}` (
        {", ".join(column_defs)}
    );
    """
    print(create_sql)  # 👉 확인용 출력
    cursor.execute(create_sql)

conn.commit()
cursor.close()
conn.close()
