### Glue를 활용한 데이터 활용하기

Jupyter notebook을 통해 AWS Glue job을 생성하기 전에 먼저 데이터를 확인해보겠습니다.
저희는 앞선 실습에서 RDS MySQL의 데이터를 크롤링하여 metadata를 만들었고, Amazon S3에 데이터가 들어왔다는 전제하에 업로드 시켜두었습니다. 

### 1. Crawl our sample dataset

예제에서는 아래의 경로에 데이터들이 적재되어 있습니다.

    s3://awskrug-data-닉네임/
    jdbc:mysql://RDS_endpoint.ap-northeast-2.rds.amazonaws.com:3306/datalab

### 2. GlueContext 설정

필요한 AWS Glue의 라이브러리를 가져와서 단일 GlueContext를 설정합니다. 

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1569309920903_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3. AWS Glue Crawler가 식별한 스키마를 확인

앞서 크롤러가 만든 database 및 table을 정의하여 스키마를 확인하겠습니다.
실습에서는 아래 2개의 테이블을 조회해 보겠습니다. 
 - order_info
 - user_event_logs
 - user_info

In [26]:
order_info = glueContext.create_dynamic_frame.from_catalog(database="datalab-s3-order-info", table_name="order_info_csv")
order_info.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- timestamp: string
|-- user_id: string
|-- goods_id: long
|-- shop_id: long
|-- price: long

In [35]:
user_event_log = glueContext.create_dynamic_frame.from_catalog(database="database-s3-test", table_name="user_event_logs_2019")
user_event_log.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- timestamp: string
|-- user_id: string
|-- event_origin: string
|-- event_name: string
|-- partition_0: string
|-- partition_1: string
|-- event_goods_id: long
|-- event_shop_id: long

In [34]:
user_info = glueContext.create_dynamic_frame.from_catalog(database="datalab-rds", table_name="datalab_user_info")
user_info.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- user_id: string
|-- os: string
|-- age: int


위와 같은 방식으로 각각의 데이터베이스에 존재하는 테이블의 스키마 정보를 확인할 수 있습니다.
이를 보고 각각 JOIN하거나 불필요 컬럼을 제거하거나,이름을 변경한 뒤 조인하는 등의 행위를 할 수 있습니다. 


### 4. 필터링

원하는 필드만 유지하고 특정 필드는 제거하여 필터링 작업을 하겠습니다. 
이때 toDF() 변환은 DynamicFrame을 Spark DataFrame으로 변환하므로 SparkSQL에 이미 존재하는 변환을 적용할 수 있습니다.



In [17]:
user_event_log = user_event_log.drop_fields(['timestamp']).rename_field('user_id', 'id')
user_event_log.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------+-----------+-----------+--------------------+--------------+-------------+
|        event_origin|   event_name|partition_0|partition_1|                  id|event_goods_id|event_shop_id|
+--------------------+-------------+-----------+-----------+--------------------+--------------+-------------+
|       shops_ranking|app_page_view|         09|         20|K1d8_t3-QIskaSkrx...|          null|         null|
|      shops_bookmark|app_page_view|         09|         20|lwFZ77v_ygk0uU40t...|          null|         null|
|goods_search_resu...|app_page_view|         09|         20|mR-bO6hC9g-m8ERXM...|          null|         null|
|      shops_bookmark|app_page_view|         09|         20|K1d8_t3-QIskaSkrx...|          null|         null|
|      shops_bookmark|app_page_view|         09|         20|Yjny5AchUWLiuv4kd...|          null|         null|
|            my_goods|app_page_view|         09|         20|LZZ0ktGq6hW685TFA...|          null|         null|
|

이러한 방식으로 특정 컬럼을 제외할 수도 있고, 리네임할 수 도 있습니다. 

이제는 datalab_user_info와 Order_info를 user_id로 조인해보겠습니다.

In [42]:
result = Join.apply(order_info, user_info,'user_id','user_id')
result = result.drop_fields(['.user_id'])

print ("Count: ",result.count())
result.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  867
root
|-- price: long
|-- shop_id: long
|-- age: int
|-- timestamp: string
|-- goods_id: long
|-- os: string
|-- .user_id: string
|-- user_id: string

In [39]:
result.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------+---+--------------------+--------+---+--------------------+--------------------+
|price|shop_id|age|           timestamp|goods_id| os|            .user_id|             user_id|
+-----+-------+---+--------------------+--------+---+--------------------+--------------------+
|23500|    102| 20|2018-06-11 02:04:...|    5436|iOS|W-CvDQrB4uXbiGGZ8...|W-CvDQrB4uXbiGGZ8...|
|15000|     41| 25|2018-06-11 14:17:...|    1884|And|fYHpMIbtwo3JQ8nPn...|fYHpMIbtwo3JQ8nPn...|
|19800|     32| 32|2018-06-11 12:21:...|    5884|iOS|RWEDTLLkIUVuQQI7Z...|RWEDTLLkIUVuQQI7Z...|
|11500|     12| 15|2018-06-11 21:36:...|    1922|And|xp2UXd5LsDAq7ib1H...|xp2UXd5LsDAq7ib1H...|
|11000|     22| 21|2018-06-11 18:16:...|    3355|iOS|ZqC_h-amqTdhYvv0V...|ZqC_h-amqTdhYvv0V...|
|44900|    139| 23|2018-06-11 02:30:...|    1172|iOS|MRNFeS0sVveDbr_PG...|MRNFeS0sVveDbr_PG...|
| 9900|    126| 23|2018-06-11 02:49:...|     151|iOS|MRNFeS0sVveDbr_PG...|MRNFeS0sVveDbr_PG...|
|52000|    145| 23|2018-06-11 03:54:...|


### 4. 데이터 쓰기

만들어진 데이터를 특정 저장장치(Amazon S3)에 쓰도록하겠습니다.
실습에서는 Amazon S3에만 데이터를 쓰지만, RDBMS, Redhift 등에도 쓸 수 있습니다.


Amazon S3경로에 조인된 데이터를 parquet으로 변환하여 upload하겠습니다.

In [None]:
import boto3, os

s3 = boto3.resource('s3')
s3.create_bucket(Bucket='demo-bucket-cdaaaal2', CreateBucketConfiguration={'LocationConstranint':'ap-northeast-2'})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
output_AmazonS3 = "s3://awskrug-data-luke/output"


print("Writing to awskrug-data-luke/output ...")
glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": output_AmazonS3}, format = "parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Writing to awskrug-data-luke/output ...
<awsglue.dynamicframe.DynamicFrame object at 0x7f8ee5acc6d8>

### 결론

ETL 작업을 하게 되면 한번에 코딩하기도 어렵고, 시간도 많이 걸릴 것 입니다.
이럴 때 AWS Glue endpoint를 사용하여 jupyter notebook 환경을 손Now let's join these relational tables to create one full history table of legislator
memberships and their correponding organizations, using AWS Glue.

 - First, we join `persons` and `memberships` on `id` and `person_id`.
 - Next, join the result with orgs on `org_id` and `organization_id`.
 - Then, drop the redundant fields, `person_id` and `org_id`.

We can do all these operations in one (extended) line of code: