# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [2]:
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg",
  "--JOB_NAME": "cdc_kinesis_glue",
  "--TEMP_DIR" : "s3://glue-cdc-temp/temp/",
  "--TempDir" : "s3://glue-cdc-temp/temp/",
  "--enable-continuous-cloudwatch-log" : "true"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
The following configurations have been updated: {'--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions', '--datalake-formats': 'iceberg', '--JOB_NAME': 'cdc_kinesis_glue', '--TEMP_DIR': 's3://glue-cdc-temp/temp/', '--TempDir': 's3://glue-cdc-temp/temp/', '--enable-continuous-cloudwatch-log': 'true'}


In [6]:
%glue_version 3.0
%number_of_workers 2
%worker_type G.1X


Setting Glue version to: 3.0
Previous number of workers: None
Setting new number of workers to: 2
Previous worker type: None
Setting new worker type to: G.1X


In [1]:
from pyspark.conf import SparkConf
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
from pyspark.sql.functions import col, desc, to_timestamp
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import current_timestamp

import logging

# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
    
conf = SparkConf()
ICEBERG_S3_PATH = "s3://aiscanner-iceberg/MST_ITEM/"
CATALOG = "glue_catalog"
DATABASE = "orapp_iceberg"
TABLE_NAME = "mst_item_iceberg"
WINDOW_SIZE = "60 seconds"
PRIMARY_KEY = "seq"

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Session ID: 82c8cae1-d1fa-44b1-af80-01f0d67d2d31
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--datalake-formats iceberg
--JOB_NAME cdc_kinesis_glue
--TEMP_DIR s3://glue-cdc-temp/temp/
--TempDir s3://glue-cdc-temp/temp/
--enable-continuous-cloudwatch-log true
Waiting for session 82c8cae1-d1fa-44b1-af80-01f0d67d2d31 to get into ready status...
Session 82c8cae1-d1fa-44b1-af80-01f0d67d2d31 has been created.



In [2]:
def setSparkIcebergConf() -> SparkConf:
    conf_list = [
        ("spark.sql.defaultCatalog", "glue_catalog"),
        (f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog"),
        (f"spark.sql.catalog.{CATALOG}.warehouse", ICEBERG_S3_PATH),
        (f"spark.sql.catalog.{CATALOG}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog"),
        (f"spark.sql.catalog.{CATALOG}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"),
        (f"spark.sql.catalog.{CATALOG}.lock-impl", "org.apache.iceberg.aws.glue.DynamoLockManager"),
        (f"spark.sql.catalog.{CATALOG}.lock.table", f"{CATALOG}_lock"),
        ("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),
        ("spark.sql.iceberg.handle-timestamp-without-timezone","true")
    ]
    spark_conf = SparkConf().setAll(conf_list)
    
    return spark_conf
    




In [3]:

conf = setSparkIcebergConf()
sc = SparkContext.getOrCreate(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job.init(args['JOB_NAME'], args)




In [4]:
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, lit, row_number, current_timestamp, desc
from pyspark.sql import Window

def processBatch(data_frame, batch_id):
    print("start")
    logger.info("start")
    try:
        # Step 1: Convert DynamicFrame to DataFrame and initialize Iceberg DataFrame
        try:
            stream_data_dynf = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")  # Kinesis
            _df = spark.sql(f"select * from {DATABASE}.{TABLE_NAME} LIMIT 0")  # Iceberg 
            print("step 1: Initialized DynamicFrame and Iceberg DataFrame")
            logger.info("step 1: Initialized DynamicFrame and Iceberg DataFrame")
        except Exception as e:
            print(f"Error in step 1: {str(e)}")
            logger.error(f"Error in step 1: {str(e)}")
            return

        # Log schema and columns of _df (Iceberg DataFrame)
        try:
            print(f"Columns in _df: {_df.columns}")
            # logger.info(f"Columns in _df: {_df.columns}")
            _df_schema_str = _df._jdf.schema().treeString()
            # print(f"Schema of _df:\n{_df_schema_str}")
            # logger.info(f"Schema of _df:\n{_df_schema_str}")
        except Exception as e:
            print(f"Error while logging Iceberg DataFrame schema: {str(e)}")
            logger.error(f"Error while logging Iceberg DataFrame schema: {str(e)}")
        
        # Step 2: Convert to DataFrame and select columns
        try:
            stream_data_df = stream_data_dynf.toDF()
            if "data" not in stream_data_df.columns or stream_data_df.filter(col("data").isNull()).count() > 0:
                print("Detected records with only metadata and no data. Skipping these records.")
                logger.warning("Detected records with only metadata and no data. Skipping these records.")
                return  # Skip processing this batch
            
            
            # seq 컬럼이 null인 레코드는 제외
            stream_data_df = stream_data_df.filter(col("data.seq").isNotNull())

            print(f"Sample data in stream_data_df:\n{stream_data_df.show(5, truncate=False)}")
            logger.info(f"Sample data in stream_data_df:\n{stream_data_df.show(5, truncate=False)}")
            print(f"Columns in stream_data_df: {stream_data_df.columns}")
            # logger.info(f"Columns in stream_data_df: {stream_data_df.columns}")
            stream_data_df_schema_str = stream_data_df._jdf.schema().treeString()
            print(f"Schema of stream_data_df:\n{stream_data_df_schema_str}")
            logger.info(f"Schema of stream_data_df:\n{stream_data_df_schema_str}")
        except Exception as e:
            print(f"Error in step 2: {str(e)}")
            logger.error(f"Error in step 2: {str(e)}")
            return

        try:
            cdc_df = stream_data_df.select(
                col('data.*'), 
                col('metadata.operation').alias('_op'),
                col('metadata.timestamp').alias('_op_timestamp')
            )
            # print(f"step 2: Columns in cdc_df after selection: {cdc_df.columns}")
            # logger.info(f"step 2: Columns in cdc_df after selection: {cdc_df.columns}")
            print(f"Sample data in cdc_df:\n{cdc_df.show(5, truncate=False)}")
            logger.info(f"Sample data in cdc_df:\n{cdc_df.show(5, truncate=False)}")
        except Exception as e:
            print(f"Error in selecting columns in step 2: {str(e)}")
            logger.error(f"Error in selecting columns in step 2: {str(e)}")
            return

        # Step 3: Ensure the cdc_df schema matches the Iceberg table schema
        try:
    # Iceberg 테이블 컬럼과 CDC DataFrame 컬럼을 모두 소문자로 변환하여 비교
            iceberg_columns = set([col.lower() for col in _df.schema.names])
            cdc_columns = set([col.lower() for col in cdc_df.columns])

            missing_columns = iceberg_columns - cdc_columns
            if missing_columns:
                print(f"Schema mismatch: missing columns in stream data - {missing_columns}")
                logger.warning(f"Schema mismatch: missing columns in stream data - {missing_columns}")

                # 새 컬럼에 대해서만 null 값 추가
                for col_name in missing_columns:
                    if col_name not in cdc_columns:
                        # 소문자로 변환된 컬럼을 기반으로 데이터 타입 추가
                        original_col_name = next((name for name in _df.schema.names if name.lower() == col_name), None)
                        if original_col_name:
                            cdc_df = cdc_df.withColumn(col_name, lit(None).cast(_df.schema[original_col_name].dataType))

                print(f"step 3: Added missing columns: {missing_columns}")
                logger.info(f"step 3: Added missing columns: {missing_columns}")

            print(f"Sample data in cdc_df after adding missing columns:\n{cdc_df.show(5, truncate=False)}")
            logger.info(f"Sample data in cdc_df after adding missing columns:\n{cdc_df.show(5, truncate=False)}")
        except Exception as e:
            print(f"Error in step 3: {str(e)}")
            logger.error(f"Error in step 3: {str(e)}")
            return


        # Step 4: Apply window function and deduplication
        try:
            print("step 4: Applying window function and deduplication")
            logger.info("step 4: Applying window function and deduplication")
            window = Window.partitionBy(PRIMARY_KEY).orderBy(desc("_op_timestamp"))
            deduped_cdc_df = cdc_df.withColumn("_row", row_number().over(window))\
                                   .filter(col("_row") == 1).drop("_row")\
                                   .select([col(c) for c in _df.schema.names])
            print(f"step 4: Columns in deduped_cdc_df after deduplication: {deduped_cdc_df.columns}")
            logger.info(f"step 4: Columns in deduped_cdc_df after deduplication: {deduped_cdc_df.columns}")
            print(f"Sample data in deduped_cdc_df:\n{deduped_cdc_df.show(5, truncate=False)}")
            logger.info(f"Sample data in deduped_cdc_df:\n{deduped_cdc_df.show(5, truncate=False)}")
        except Exception as e:
            print(f"Error in step 4: {str(e)}")
            logger.error(f"Error in step 4: {str(e)}")
            return

        # Step 5: Add 'trans_time' column
        try:
            deduped_cdc_df = deduped_cdc_df.withColumn('trans_time', current_timestamp())
            print("step 5: Added 'trans_time' column to deduped_cdc_df")
            logger.info("step 5: Added 'trans_time' column to deduped_cdc_df")
            print(f"Sample data in deduped_cdc_df with 'trans_time':\n{deduped_cdc_df.show(5, truncate=False)}")
            logger.info(f"Sample data in deduped_cdc_df with 'trans_time':\n{deduped_cdc_df.show(5, truncate=False)}")
        except Exception as e:
            print(f"Error in step 5: {str(e)}")
            logger.error(f"Error in step 5: {str(e)}")
            return

        # Step 6: Upsert operation
        try:
            upserted_df = deduped_cdc_df.filter(col('_op') != 'delete')
            upserted_count = upserted_df.count()
            print(f"step 6: Upserted rows count: {upserted_count}")
            logger.info(f"step 6: Upserted rows count: {upserted_count}")
            if upserted_count > 0:
                print("Sample data in upserted_df:")
                upserted_df.show(5, truncate=False)

                print("Schema of upserted_df:")
                upserted_df.printSchema()
                upserted_df.createOrReplaceTempView(f"{TABLE_NAME}_upsert")
                print(f"step 6: Created temporary view for upsert: {TABLE_NAME}_upsert")
                logger.info(f"step 6: Created temporary view for upsert: {TABLE_NAME}_upsert")
                spark.sql(f"""
                    MERGE INTO {DATABASE}.{TABLE_NAME} t 
                    USING {TABLE_NAME}_upsert s 
                    ON t.{PRIMARY_KEY} = s.{PRIMARY_KEY} 
                    WHEN MATCHED THEN UPDATE SET * 
                    WHEN NOT MATCHED THEN INSERT *
                """)
                print("step 6: Completed MERGE operation")
                logger.info("step 6: Completed MERGE operation")
        except Exception as e:
            print(f"Error in step 6 (Upsert operation): {str(e)}")
            logger.error(f"Error in step 6 (Upsert operation): {str(e)}")
            return

        # Step 7: Delete operation
        try:
            deleted_df = deduped_cdc_df.filter(col('_op') == 'delete')
            deleted_count = deleted_df.count()
            print(f"step 7: Deleted rows count: {deleted_count}")
            logger.info(f"step 7: Deleted rows count: {deleted_count}")
            if deleted_count > 0:
                deleted_df.createOrReplaceTempView(f"{TABLE_NAME}_delete")
                print(f"step 7: Created temporary view for delete: {TABLE_NAME}_delete")
                logger.info(f"step 7: Created temporary view for delete: {TABLE_NAME}_delete")
                spark.sql(f"DELETE FROM {DATABASE}.{TABLE_NAME} t USING {TABLE_NAME}_delete s WHERE t.{PRIMARY_KEY} = s.{PRIMARY_KEY}")
                print("step 7: Completed DELETE operation")
                logger.info("step 7: Completed DELETE operation")
        except Exception as e:
            print(f"Error in step 7 (Delete operation): {str(e)}")
            logger.error(f"Error in step 7 (Delete operation): {str(e)}")
            return

    except Exception as e:
        print(f"Error in processBatch: {str(e)}")
        logger.error(f"Error in processBatch: {str(e)}")





In [4]:
from pyspark.sql import functions as F
kds_df = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options={
        "typeOfData": "kinesis",
        "streamARN": "arn:aws:kinesis:ap-northeast-2:646298132551:stream/cdc-test",
        "classification": "json",
        "startingPosition": "LATEST",
        "inferSchema": "true",
    },
    transformation_ctx="kds_df",
    additional_options={"samplingRatio": 0.15}
)

# 변환된 스키마 확인
checkpointPath = f"{args['TempDir']}/{args['JOB_NAME']}/checkpoint/"


NameError: name 'glueContext' is not defined


In [None]:
glueContext.forEachBatch(frame=kds_df, batch_function = processBatch,
                         options={"windowSize":WINDOW_SIZE, "checkpointLocation":checkpointPath})

In [None]:
# spark.sql(f"""
#                     MERGE INTO {DATABASE}.{TABLE_NAME} t 
#                     USING {TABLE_NAME}_upsert s 
#                     ON t.{PRIMARY_KEY} = s.{PRIMARY_KEY} 
#                     WHEN MATCHED THEN UPDATE SET * 
#                     WHEN NOT MATCHED THEN INSERT *
#                 """)

In [6]:
# from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, DoubleType
# schema = StructType([
#     StructField("data", StructType([
#         StructField("SEQ", IntegerType(), True),
#         StructField("CMP_CD", StringType(), True),
#         StructField("BRAND_CD", StringType(), True),
#         StructField("STORE_TP", IntegerType(), True),
#         StructField("ITEM_CD", StringType(), True),
#         StructField("ITEM_SEQ", IntegerType(), True),
#         StructField("AIRPORT_DIV", IntegerType(), True),
#         StructField("ITEM_NM", StringType(), True),
#         StructField("IMAGE_PATH", StringType(), True),
#         StructField("DETAIL_COUNT", IntegerType(), True),
#         StructField("ITEM_BARCODE", StringType(), True),
#         StructField("ETC", StringType(), True),
#         StructField("USE_YN", StringType(), True),
#         StructField("INST_USER", StringType(), True),
#         StructField("UPD_USER", StringType(), True),
#         StructField("UPD_TIME", StringType(), True),
#         StructField("TRAIN_CD", StringType(), True)
#     ]), True),
#     StructField("metadata", StructType([
#         StructField("timestamp", StringType(), True),
#         StructField("record-type", StringType(), True),
#         StructField("operation", StringType(), True),
#         StructField("partition-key-type", StringType(), True),
#         StructField("schema-name", StringType(), True),
#         StructField("table-name", StringType(), True)
#     ]), True)
# ])

# # 2. Kinesis에서 스트리밍 데이터 읽기
# kds_df = glueContext.create_data_frame.from_options(
#     connection_type="kinesis",
#     connection_options={
#         "typeOfData": "kinesis",
#         "streamARN": "arn:aws:kinesis:ap-northeast-2:646298132551:stream/cdc-test",
#         "classification": "json",
#         "startingPosition": "TRIM_HORIZON",
#         "inferSchema": "false",
#     }
# )





In [5]:
# # inferSchema를 true로 설정하고 데이터를 자동으로 추론
# kds_df = glueContext.create_data_frame.from_options(
#     connection_type="kinesis",
#     connection_options={
#         "typeOfData": "kinesis",
#         "streamARN": "arn:aws:kinesis:ap-northeast-2:646298132551:stream/cdc-test",
#         "classification": "json",
#         "startingPosition": "TRIM_HORIZON",
#         "inferSchema": "true",  # 스키마 추론을 활성화
#     },
# )

# # 샘플 데이터를 확인하여 스키마가 어떻게 되어있는지 확인




In [9]:
# from pyspark.sql import DataFrame

# # 스트리밍을 통해 10건만 수집하는 함수
# def collect_stream_data(stream_df, max_records=10):
#     collected_data = []
    
#     query = stream_df.writeStream \
#         .outputMode("append") \
#         .format("memory") \
#         .queryName("stream_data") \
#         .trigger(processingTime="1 second") \
#         .start()

#     # 데이터가 10건 이상 수집되면 중지
#     while len(collected_data) < max_records:
#         # 메모리 테이블에서 데이터 가져오기
#         stream_data = spark.sql("SELECT * FROM stream_data")
#         current_count = stream_data.count()

#         # 새로운 데이터를 수집
#         if current_count > 0:
#             collected_data.extend(stream_data.collect())
        
#         # 수집된 데이터가 max_records에 도달하면 종료
#         if len(collected_data) >= max_records:
#             query.stop()
#             break
    
#     # 수집된 데이터를 DataFrame으로 변환하여 반환
#     collected_df = spark.createDataFrame(collected_data[:max_records], schema=stream_df.schema)
#     return collected_df

# # 수집된 데이터를 DataFrame으로 저장
# collected_df = collect_stream_data(kds_df, max_records=10)




In [13]:
# import json
# from pyspark.sql.functions import lit
# from pyspark.sql.types import StringType

# # Iceberg 테이블의 컬럼 목록
# iceberg_columns = set({
#     "SEQ", "CMP_CD", "BRAND_CD", "STORE_TP", "ITEM_CD", "ITEM_SEQ", "SPCN_ITEM_CD", 
#     "SPCN_ITEM_SEQ", "AIRPORT_DIV", "ITEM_NM", "IMAGE_THUM_PATH", "IMAGE_PATH", 
#     "DETAIL_COUNT", "ITEM_BARCODE", "ETC", "USE_YN", "INST_USER", "INST_TIME", 
#     "UPD_USER", "UPD_TIME", "ITEM_TRANS", "ITEM_SHOOT", "LABEL_FIN", "STUDY", 
#     "TRAIN_CD", "ITEM_REFL_FIN", "SIMILAR", "LABEL_CNT_NEW", "LABEL_CNT_ALL", 
#     "BUNDLE_CODE"
# })

# # 매 행(row)을 처리하면서 컬럼 비교 (최대 5개 행만 처리)
# for idx, row in enumerate(collected_df.collect()):
#     if idx >= 5:  # 5개까지만 처리
#         break

#     temp_df = row["$json$data_infer_schema$.temporary$"]  # 열 안에 있는 JSON 문자열 가져오기
#     # print(f"Processing row {idx + 1}:\n{temp_df}")

#     if temp_df is not None:
#         # JSON 문자열을 파싱하여 딕셔너리로 변환
#         json_data = json.loads(temp_df)
#         if 'data' not in json_data.keys():
#             print("skip: ",json_data.keys())
#             continue
#         cdc_columns = set(json_data['data'].keys())
#         print(f"Columns in JSON data: {cdc_columns}")

#         # Iceberg 컬럼과 비교하여 누락된 컬럼 찾기
#         missing_columns = iceberg_columns - cdc_columns
#         if missing_columns:
#             print(f"Schema mismatch: missing columns in stream data - {missing_columns}")

#             # 누락된 컬럼을 null 값으로 추가
#             for col_name in missing_columns:
#                 if col_name not in collected_df["$json$data_infer_schema$.temporary$"]['data'].columns:
#                     # 스키마에 없는 컬럼을 StringType으로 추가
#                     _collected_df = collected_df["$json$data_infer_schema$.temporary$"]['data'].withColumn(col_name, lit(None).cast(StringType()))

#             print(f"step 3: Added missing columns: {missing_columns}")
            
#         # 결과 출력
#         _collected_df.show(5, truncate=False)


AnalysisException: Cannot resolve column name "$json$data_infer_schema$.temporary$" among ($json$data_infer_schema$.temporary$); did you mean to quote the `$json$data_infer_schema$.temporary$` column?


In [16]:
# collected_df["`$json$data_infer_schema$.temporary$`"]

Column<'$json$data_infer_schema$.temporary$'>


In [8]:
# from pyspark.sql import SparkSession
# import time
# # Spark 세션 가져오기
# # spark = SparkSession.builder.getOrCreate()

# # 메모리 테이블에서 데이터를 읽어오는 함수
# def fetch_stream_data():
#     return spark.sql("SELECT * FROM stream_data limit 10")

# # 테스트를 위한 배치 처리 함수 호출
# def test_process_batch():
#     print("a")
#     for _ in range(10):  # 무한 루프 대신 제한된 반복으로 테스트
#         # 메모리에서 데이터 가져오기
#         print("b")
#         stream_data_df = fetch_stream_data()
#         print(stream_data_df)

#         if stream_data_df.count() > 0:
#             # 데이터가 있을 때만 처리
#             processBatch(stream_data_df, batch_id="test_batch")
#         else:
#             print("No new data in the stream_data table.")

#         # 적절한 대기 시간을 두어 불필요한 로드를 방지 (예: 10초마다 체크)
        
#         print("c")
#         time.sleep(10)




In [19]:
processBatch(collected_df,1)

start
Error in step 1: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table mst_item_iceberg. StorageDescriptor#InputFormat cannot be null for table: mst_item_iceberg (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)


In [9]:
# from pyspark.sql import functions as F
# kds_df = glueContext.create_data_frame.from_options(
#     connection_type="kinesis",
#     connection_options={
#         "typeOfData": "kinesis",
#         "streamARN": "arn:aws:kinesis:ap-northeast-2:646298132551:stream/cdc-test",
#         "classification": "json",
#         "startingPosition": "LATEST",
#         "inferSchema": "true",
#     },
#     transformation_ctx="kds_df",
#     additional_options={"samplingRatio": 0.15}
# )

# # 변환된 스키마 확인
# checkpointPath = f"{args['TempDir']}/{args['JOB_NAME']}/checkpoint/"
# # kds_df.printSchema()

Py4JJavaError: An error occurred while calling o177.getDynamicFrame.
: java.lang.UnsupportedOperationException: Streaming data source doesn't support dynamic frame
	at com.amazonaws.services.glue.StreamingDataSource.getDynamicFrame(DataSource.scala:1171)
	at com.amazonaws.services.glue.StreamingDataSource.getDynamicFrame$(DataSource.scala:1171)
	at com.amazonaws.services.glue.KinesisDataSource.getDynamicFrame(DataSource.scala:1474)
	at com.amazonaws.services.glue.StreamingDataSource.getDynamicFrame(DataSource.scala:1172)
	at com.amazonaws.services.glue.StreamingDataSource.getDynamicFrame$(DataSource.scala:1172)
	at com.amazonaws.services.glue.KinesisDataSource.getDynamicFrame(DataSource.scala:1474)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at

In [6]:
job.commit()

GlueArgumentError: the following arguments are required: --JOB_NAME


In [5]:
# # 스키마 추론 버전
# # kds_df = glueContext.create_data_frame.from_options(
# #     connection_type="kinesis",
# #     connection_options={
# #         "typeOfData": "kinesis",
# #         "streamARN" : "arn:aws:kinesis:ap-northeast-2:646298132551:stream/cdc-test",
# #         "classification": "json",
# #         "startingPosition": "TRIM_HORIZON",
# #         "inferSchema": "true",
# #     },
# #     transformation_ctx="kds_df",
# # )

# # 스키마 명시 버전 
# from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, DoubleType

# schema = StructType([
#     StructField("data", StructType([
#         StructField("SEQ", IntegerType(), True),
#         StructField("CMP_CD", StringType(), True),
#         StructField("BRAND_CD", StringType(), True),
#         StructField("STORE_TP", IntegerType(), True),
#         StructField("ITEM_CD", StringType(), True),
#         StructField("ITEM_SEQ", IntegerType(), True),
#         StructField("AIRPORT_DIV", IntegerType(), True),
#         StructField("ITEM_NM", StringType(), True),
#         StructField("IMAGE_PATH", StringType(), True),
#         StructField("DETAIL_COUNT", IntegerType(), True),
#         StructField("ITEM_BARCODE", StringType(), True),
#         StructField("ETC", StringType(), True),
#         StructField("USE_YN", StringType(), True),
#         StructField("INST_USER", StringType(), True),
#         StructField("UPD_USER", StringType(), True),
#         StructField("UPD_TIME", StringType(), True),
#         StructField("TRAIN_CD", StringType(), True)
#     ]), True),
#     StructField("metadata", StructType([
#         StructField("timestamp", StringType(), True),
#         StructField("record-type", StringType(), True),
#         StructField("operation", StringType(), True),
#         StructField("partition-key-type", StringType(), True),
#         StructField("schema-name", StringType(), True),
#         StructField("table-name", StringType(), True)
#     ]), True)
# ])

# kds_df = glueContext.create_data_frame.from_options(
#     connection_type="kinesis",
#     connection_options={
#         "typeOfData": "kinesis",
#         "streamARN": "arn:aws:kinesis:ap-northeast-2:646298132551:stream/cdc-test",
#         "classification": "json",
#         "startingPosition": "TRIM_HORIZON",
#         "inferSchema": "false",
#     },
#     format_options={"schema": schema},
#     transformation_ctx="kds_df",
# )

# # LATEST



