기본적으로 데이터를 로딩하는 부분에서는 Glue의 API를 주로 사용하도록 합니다. Glue DynamicFrame에서는 대량의 파일을 로딩 / 적재 하는데 최적화된 API를 제공합니다.

데이터 로딩 이후 데이터의 변환에는 Spark DataFrame를 기본적으로 사용합니다.

먼저 AWS Glue의 주요 라이브러리를 로딩하고 Spark 작업을 실행하기 위한 GlueContext를 생성합니다.

아래 코드는 Glue에서 Job 생성시 기본 코드 템플릿에 포함된 부분입니다. 
Job / Bookmark 관련된 일부 코드는 주석처리 하였습니다. 

In [1]:
#import sys
#from awsglue.transforms import *
#from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
#from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1558507935305_0003,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


Glue Data Catalog에 있는 정보를 기반으로 테이블 데이터를 로딩합니다. 다음 코드가 동작하기 위해서는 앞단계에서 Glue Crawler를 통해 Database와 테이블을 생성해주어야 합니다. 

In [2]:
order = glueContext.create_dynamic_frame.from_catalog(database="analytics-source", table_name="order").toDF()
print "Count: ", order.count()
order.printSchema()
order.show(5)

Count:  11283758
root
 |-- member_id: string (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_status: string (nullable = true)
 |-- country: string (nullable = true)
 |-- shipping_date: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- city: string (nullable = true)
 |-- order_time: long (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- order_id: string (nullable = true)

+---------+----------+------------+-------------+-------------+-----------+-------------+--------------+------------+-----------+------+--------------+
|member_id|order_date|order_status|      country|shipping_date|total_price|         city|    order_time|       state|postal_code|region|      order_id|
+---------+----------+------------+-------------+-------------+-----------+-------------+--------------+------------+-----------+------+--------------+
| ND-18370|  20161119|     shipped|Unit

이후 분석 단계에서 불필요한 컬럼을 미리 식별하여 데이터를 정리하는 작업을 진행합니다. 
몇몇 지역을 나타내는 컬럼의 데이터 분포를 살펴보고, 꼭 필요한 City 컬럼만 남기고 삭제하는 작업을 진행합니다. 

In [3]:
#order.groupBy("country").count().orderBy($"count".desc).show(10)
#order.groupBy("region").count().orderBy($"count".desc).show(10)
order.groupBy("country").count().sort("count", ascending=False).show(10)
order.groupBy("region").count().sort("count", ascending=False).show(10)
order.groupBy("city").count().sort("count", ascending=False).show(10)


+-------------+--------+
|      country|   count|
+-------------+--------+
|United States|11283758|
+-------------+--------+

+-------+-------+
| region|  count|
+-------+-------+
|  South|3335405|
|   East|2819257|
|Central|2797985|
|   West|2331111|
+-------+-------+

+-------------+------+
|         city| count|
+-------------+------+
|San Francisco|896024|
|     Elmhurst|895081|
|New York City|894701|
|    Henderson|515644|
|  Garden City|514531|
|       Denver|492449|
|   Chesapeake|491955|
|     Columbus|491824|
|       Toledo|491417|
|      Detroit|491338|
+-------------+------+
only showing top 10 rows

In [4]:
order_item = glueContext.create_dynamic_frame.from_catalog(database="analytics-source", table_name="order_item").toDF()
print "Count: ", order_item.count()
order_item.printSchema()
order_item.show(5)

Count:  22573388
root
 |-- item_count: long (nullable = true)
 |-- order_date: long (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_price: long (nullable = true)
 |-- order_time: long (nullable = true)
 |-- order_id: string (nullable = true)

+----------+----------+---------------+----------+--------------+--------------+
|item_count|order_date|        item_id|item_price|    order_time|      order_id|
+----------+----------+---------------+----------+--------------+--------------+
|         5|  20171221|OFF-SU-10000898|        14|20171221000000|CA-2017-165841|
|         3|  20161103|OFF-PA-10004359|       105|20161103000000|CA-2016-109365|
|         5|  20151127|FUR-CH-10002647|        71|20151127000000|CA-2015-101910|
|         5|  20171102|OFF-AR-10003469|         2|20171102000000|US-2017-163790|
|         1|  20141206|FUR-TA-10003238|       551|20141206000000|US-2014-112872|
+----------+----------+---------------+----------+--------------+--------------+
only show

In [5]:
order_detail = order.join(order_item, "order_id")
order_detail.show(10)

+--------------+---------+----------+------------+-------------+-------------+-----------+---------------+--------------+--------------+-----------+------+----------+----------+---------------+----------+--------------+
|      order_id|member_id|order_date|order_status|      country|shipping_date|total_price|           city|    order_time|         state|postal_code|region|item_count|order_date|        item_id|item_price|    order_time|
+--------------+---------+----------+------------+-------------+-------------+-----------+---------------+--------------+--------------+-----------+------+----------+----------+---------------+----------+--------------+
|CA-2014-114335| XP-21865|  20140928|     shipped|United States|   2014-10-03|        337|      Hollywood|20140928000000|       Florida|      33021| South|         4|  20140928|FUR-FU-10000277|       105|20140928000000|
|CA-2014-120670| JK-16120|  20141102|     shipped|United States|   2014-11-06|        800|Fort Lauderdale|20141102000000

In [6]:
order_detail.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_status: string (nullable = true)
 |-- country: string (nullable = true)
 |-- shipping_date: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- city: string (nullable = true)
 |-- order_time: long (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- item_count: long (nullable = true)
 |-- order_date: long (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_price: long (nullable = true)
 |-- order_time: long (nullable = true)

In [7]:
order.head()

Row(member_id=u'ND-18370', order_date=20161119, order_status=u'shipped', country=u'United States', shipping_date=u'2016-11-25', total_price=53, city=u'Long Beach', order_time=20161119000000, state=u'New York', postal_code=11561, region=u'East', order_id=u'CA-2016-130778')

앞쪽에서 Dataframe을 통해서 데이터 정리 작업과 Join을 완료한 데이터 파일을 별도의 S3 버킷에 저장합니다.
(DataFrame으로 변경한 데이터는 DynamicFrame로 변경하는 작업이 추가됩니다.)
저장된 데이터를 기반으로 ad-hoc 쿼리와 분석을 수행하도록 합니다. 

In [8]:
from awsglue.dynamicframe import DynamicFrame

order_detail_dyf = DynamicFrame.fromDF(order_detail, glueContext, 'order_detail_dyf')

In [9]:
datasink1 = glueContext.write_dynamic_frame.from_options(frame = order_detail_dyf, connection_type = "s3", connection_options = {"path": "s3://analytics-data-seung/e-commerce-analytics/order_detail"}, format = "parquet", transformation_ctx = "datasink1")



추가적으로 필요한 데이터를 모두 로딩해서 저장하도록 합니다. 
order (이전에 로딩 완료)
order_item (이전에 로딩 완료)
member
item
item_category

앞에서 로딩한 테이블은 DynamicFrame으로 변경하여 저장합니다.

In [None]:
order_dyf = DynamicFrame.fromDF(order, glueContext, 'order_dyf')
order_item_dyf = DynamicFrame.fromDF(order_item, glueContext, 'order_item_dyf')



In [None]:
datasink2 = glueContext.write_dynamic_frame.from_options(frame = order_dyf, connection_type = "s3", connection_options = {"path": "s3://analytics-data-seung/e-commerce-analytics/order"}, format = "parquet", transformation_ctx = "datasink2")
datasink3 = glueContext.write_dynamic_frame.from_options(frame = order_item_dyf, connection_type = "s3", connection_options = {"path": "s3://analytics-data-seung/e-commerce-analytics/order_item"}, format = "parquet", transformation_ctx = "datasink3")



In [10]:
member = glueContext.create_dynamic_frame.from_catalog(database="analytics-source", table_name="member")
item = glueContext.create_dynamic_frame.from_catalog(database="analytics-source", table_name="item")
item_category = glueContext.create_dynamic_frame.from_catalog(database="analytics-source", table_name="item_category")


In [None]:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = member, connection_type = "s3", connection_options = {"path": "s3://analytics-data-seung/e-commerce-analytics/member"}, format = "parquet", transformation_ctx = "datasink4")
datasink5 = glueContext.write_dynamic_frame.from_options(frame = item, connection_type = "s3", connection_options = {"path": "s3://analytics-data-seung/e-commerce-analytics/item"}, format = "parquet", transformation_ctx = "datasink5")
datasink6 = glueContext.write_dynamic_frame.from_options(frame = item_category, connection_type = "s3", connection_options = {"path": "s3://analytics-data-seung/e-commerce-analytics/item_category"}, format = "parquet", transformation_ctx = "datasink6")


In [11]:
member = member.toDF()
item = item.toDF()
item_category = item_category.toDF()

In [12]:
member.show(10)

+---------+-------------+--------+------+-------------+---------------+----------------+--------------------+-------------------+-----------+-----------+-------+---+--------------+
|member_id|      country|login_id|gender|         city|last_login_ymdt|membership_level|      login_password|               name|      state|postal_code| region|age|      reg_ymdt|
+---------+-------------+--------+------+-------------+---------------+----------------+--------------------+-------------------+-----------+-----------+-------+---+--------------+
| AG-10300|United States|AG-10300|     M|  Los Angeles| 20170903161703|                |\x42dd0eff411ab32...|Aleksandra Gannaway| California|      90049|   West| 37|20140313115051|
| AJ-10960|United States|AJ-10960|     M|    Rochester| 20171025171702|                |\x03821df3605318d...|       Astrea Jones|   New York|      14609|   East| 43|20140614084008|
| AT-10735|United States|AT-10735|     M|      Bristol| 20171226054706|                |\x2785a

In [47]:
member_gender = member.groupBy("city", "gender").count().orderBy("count").sort("count", ascending=False)
member_gender.show(10)

#df.orderBy(desc("age"), "name").colle
#sampled.groupBy("key").count().orderBy("key").show()
#sample_df2.cube(sample_df2["store"], sample_df2["product"]).agg(sum(sample_df2["amount"]),grouping(sample_df2["store"])).show(truncate=False)



+-------------+------+-----+
|         city|gender|count|
+-------------+------+-----+
|  Los Angeles|     M|   33|
|New York City|     M|   31|
|  Los Angeles|     F|   26|
|San Francisco|     M|   23|
|New York City|     F|   22|
| Philadelphia|     F|   20|
|      Seattle|     F|   19|
| Philadelphia|     M|   17|
|San Francisco|     F|   17|
|      Houston|     F|   16|
+-------------+------+-----+
only showing top 10 rows

In [68]:
member_pivot = member.groupBy("city").pivot("gender").count()
member_pivot.show(10)


+---------------+----+----+
|           city|   F|   M|
+---------------+----+----+
|          Tyler|null|   1|
|          Pasco|   1|null|
|    Springfield|   7|   6|
|  Bowling Green|   1|null|
|        Edmonds|null|   1|
|          Tempe|   1|   1|
|         Auburn|null|   2|
|North Las Vegas|null|   1|
|        Phoenix|   1|   2|
|      Bethlehem|   1|null|
+---------------+----+----+
only showing top 10 rows

In [38]:
#member_gender_transp = transpose(member).show()
member_gender.pivot("city").show()
#member_gender.count()

df4.groupBy("year").pivot("course").sum("earnings").collect()

'DataFrame' object has no attribute 'pivot'
Traceback (most recent call last):
  File "/mnt/yarn/usercache/livy/appcache/application_1558507935305_0003/container_1558507935305_0003_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 1020, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute 'pivot'



In [56]:
sqlContext.registerDataFrameAsTable(member, "member")
#member.createOrReplaceTempView("member")

In [58]:
df2 = spark.sql("select * from member").show()

u'Table or view not found: member; line 1 pos 14'
Traceback (most recent call last):
  File "/mnt/yarn/usercache/livy/appcache/application_1558507935305_0003/container_1558507935305_0003_01_000001/pyspark.zip/pyspark/sql/session.py", line 603, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/mnt/yarn/usercache/livy/appcache/application_1558507935305_0003/container_1558507935305_0003_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/mnt/yarn/usercache/livy/appcache/application_1558507935305_0003/container_1558507935305_0003_01_000001/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Table or view not found: member; line 1 pos 14'



In [59]:
sqlContext.dropTempTable("member")

In [52]:
spark.catalog.dropTempView("member")

In [None]:
sqlContext.registerDataFrameAsTable(df, "table1")
>>> sqlContext.dropTempTable("table1")

In [None]:
(sql("""select *, concat('Q', d_qoy) as qoy
  from store_sales
  join date_dim on ss_sold_date_sk = d_date_sk
  join item on ss_item_sk = i_item_sk""")
  .groupBy("i_category")
  .pivot("qoy")
  .agg(round(sum("ss_sales_price")/1000000,2))
  .show)

+-----------+----+----+----+----+
| i_category|  Q1|  Q2|  Q3|  Q4|
+-----------+----+----+----+----+
|      Books|1.58|1.50|2.84|4.66|
|      Women|1.41|1.36|2.54|4.16|
|      Music|1.50|1.44|2.66|4.36|
|   Children|1.54|1.46|2.74|4.51|
|     Sports|1.47|1.40|2.62|4.30|
|      Shoes|1.51|1.48|2.68|4.46|
|    Jewelry|1.45|1.39|2.59|4.25|
|       null|0.04|0.04|0.07|0.13|
|Electronics|1.56|1.49|2.77|4.57|
|       Home|1.57|1.51|2.79|4.60|
|        Men|1.60|1.54|2.86|4.71|
+-----------+----+----+----+----+

In [None]:
#order_fact = order.join(order_item, "order_id")

item_detail = item.join(item_category, item.item_category_id == item_category.category_id)



In [13]:
item_detail.show(10)

name 'item_detail' is not defined
Traceback (most recent call last):
NameError: name 'item_detail' is not defined



In [None]:
order_fact = member.join(order.join(order_item.join(item.join(item_category, item.item_category_id == item_category.category_id), "item_id"), "order_id"), "member_id")
order_fact.printSchema()

In [None]:
member.printSchema()

In [None]:
item.printSchema()

In [None]:
item_category.printSchema()