# Glue Job Basic Template

#### 이번 Lab에서는 Glue Job의 기본 Template을 살펴보고 SageMaker notebook을 사용하여 간단하게 데이터를 읽고, ETL하고, Write하는 과정을 살펴본다.

#### S3에 업로드한 데이터를 읽어오기 위해 각자 S3 bucket에 지정한 account-id를 account_id 변수에 할당한다.

In [1]:
account_id = '0123456789'
s3_bucket = 's3://aws-glue-hol-' + account_id

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
76,application_1544019436848_0077,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


#### 실제 Glue Job의 Template은 다음과 같다. (아래코드는 실행하지 말 것)

In [None]:
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import regexp_extract, col

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# SparkContext 변수 생성
sc = SparkContext()
# GlueContext 변수 생성
glueContext = GlueContext(sc)
# SparkSession 변수 생성
spark = glueContext.spark_session
# Job 변수 생성
job = Job(glueContext)
# Job 초기화
job.init(args['JOB_NAME'], args)

# S3에서 csv 데이터를 읽어 DynamicFrame 생성
titanic_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type = 's3',
    connection_options = {'paths': [s3_bucket + '/train']},
    format='csv',
    format_options={
        "withHeader": True,
        "delimiter": ','
    })

# Spark 활용: DynamicFrame을 DataFrame으로 변환 및 initail column을 추가
titanic_csv_df = titanic_dyf.toDF()
titanic_csv_df = titanic_csv_df.withColumn('initial', regexp_extract(col('Name'), "(\w+)\.", 1))

# Glue 활용: DataFrame을 DynamicFrame으로 변환하여 Name column 삭제
titanic_csv_dyf = DynamicFrame.fromDF(titanic_csv_df, glueContext, 'titanic_csv_dyf').drop_fields('Name')

# json format으로 output 디렉토리에 저장
titanic_csv_df.write \
    .format('json') \
    .mode('overwrite') \
    .save(s3_bucket + '/output')
# Job commit
job.commit()

#### Jupyter Notebook에서는 SparkContext(sc)와 SparkSession(spark)이 지원되므로 선언해서 사용하지 않아도 된다.

In [2]:
sc

<SparkContext master=yarn appName=livy-session-76>

In [3]:
spark

<pyspark.sql.session.SparkSession object at 0x7f7e53e15390>

#### Jupyter Notebook에서는 아래와 같이 이미 선언된 SparkContext(sc)를 사용해서 GlueContext 생성이 가능하다.  

In [4]:
from awsglue.context import GlueContext
from pyspark.sql.functions import regexp_extract, col

# Jupyter Notebook에서 이미 생성한 SparkContext(sc)를 사용해서 GlueContext 변수 생성
glueContext = GlueContext(sc)

#### S3 데이터를 읽어 DynamicFrame 생성 

In [5]:
titanic_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type = 's3',
    connection_options = {'paths': [s3_bucket + '/train']},
    format='csv',
    format_options={
        'withHeader': True,
        'delimiter': ','
    })

#### DynamicFrame을 DataFrame으로 변환하고 데이터 확인

In [6]:
titanic_csv_df = titanic_dyf.toDF()
titanic_csv_df.show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|     Mr. Owen Harris|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|
|          2|       1|     1|Mrs. John Bradley...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|         Miss. Laina|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|
|          4|       1|     1|Mrs. Jacques Heat...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|   Mr. William Henry|  male| 35|    0|    0|          373450|   8.05|     |       S|
|          6|       0|     3|           Mr. James|  male|   |    0|    0|          33087

#### Name 컬럼에서 유의미한 String만 추출하여 initial 컬럼생성

In [7]:
titanic_csv_df = titanic_csv_df.withColumn('initial', regexp_extract(col('Name'), "(\w+)\.", 1))

#### json format으로 output 디렉토리에 저장

In [8]:
titanic_csv_df.write \
    .format('json') \
    .mode('overwrite') \
    .save(s3_bucket + '/output')

#### 전체 데이터 수 확인

In [9]:
titanic_csv_df.count()

891

#### 컬럼 및 샘플 데이터 확인

In [10]:
titanic_csv_df.show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|initial|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|     Mr. Owen Harris|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|     Mr|
|          2|       1|     1|Mrs. John Bradley...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs|
|          3|       1|     3|         Miss. Laina|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|   Miss|
|          4|       1|     1|Mrs. Jacques Heat...|female| 35|    1|    0|          113803|   53.1| C123|       S|    Mrs|
|          5|       0|     3|   Mr. William Henry|  male| 35|    0|    0|          373450|   8.05|     |       S|     Mr|
|          6|       0|  

#### initial 컬럼 생성으로 Name 컬럼은 필요없으므로 삭제

In [11]:
from awsglue.dynamicframe import DynamicFrame

titanic_csv_dyf = DynamicFrame.fromDF(titanic_csv_df, glueContext, 'titanic_csv_dyf').drop_fields('Name')
titanic_csv_dyf.toDF().show()

+-----------+--------+------+------+---+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|initial|
+-----------+--------+------+------+---+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|     Mr|
|          2|       1|     1|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs|
|          3|       1|     3|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|   Miss|
|          4|       1|     1|female| 35|    1|    0|          113803|   53.1| C123|       S|    Mrs|
|          5|       0|     3|  male| 35|    0|    0|          373450|   8.05|     |       S|     Mr|
|          6|       0|     3|  male|   |    0|    0|          330877| 8.4583|     |       Q|     Mr|
|          7|       0|     1|  male| 54|    0|    0|           17463|51.8625|  E46|       S

#### Json으로 변환해서 저장된 데이터가 정상적으로 저장되었는지 확인

In [12]:
titanic_json_df = glueContext.create_dynamic_frame_from_options(
    connection_type = 's3',
    connection_options = {'paths': [s3_bucket + '/output']},
    format='json').toDF()

In [13]:
titanic_json_df.count()

891

In [14]:
titanic_json_df.show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|initial|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|     Mr. Owen Harris|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|     Mr|
|          2|       1|     1|Mrs. John Bradley...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs|
|          3|       1|     3|         Miss. Laina|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|   Miss|
|          4|       1|     1|Mrs. Jacques Heat...|female| 35|    1|    0|          113803|   53.1| C123|       S|    Mrs|
|          5|       0|     3|   Mr. William Henry|  male| 35|    0|    0|          373450|   8.05|     |       S|     Mr|
|          6|       0|  