In [1]:
#pip install awswrangler

In [2]:
import awswrangler as wr

## Create Governed Table(metadata only)

In [3]:
#before executing belwo, please complete to set lake location/permission setting in LakeFormation Console

In [4]:
# Governed Table를 Glue Console이 아닌 Wrangle create_parquet_table library로 생성함
# Governed Table의 경우 table_type에 'GOVERNED'로 명시 필요
wr.catalog.create_parquet_table(
    database='governed_demo',
    table='governed_demo_table',
    path='s3://governedtable-example/data/',
    columns_types={'registration_dttm':'Timestamp',
                  'id':'int',
                  'first_name':'String',
                  'last_name':'String',
                  'email':'String',
                  'gender':'String',
                  'ip_address':'String',
                  'cc':'String',
                  'country':'String',
                  'birthdate':'String',
                  'salary':'double',
                  'title':'string',
                  'comment':'string'},
    compression='snappy',
    description='governed_table example',
    table_type='GOVERNED'
)

## Read existing csv data from S3 into dataframe

In [5]:
df_raw=wr.s3.read_parquet(r's3://governedtable-example/raw/')
df_raw.head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


## Filter select records to insert into governed table

In [6]:
df_female=df_raw[df_raw.gender.isin(['Female'])]
df_female.head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625,China,4/8/1997,90263.05,Senior Cost Accountant,
5,2016-02-03 07:22:34,6,Kathryn,White,kwhite5@google.com,Female,195.131.81.179,3583136326049310,Indonesia,2/25/1983,69227.11,Account Executive,
9,2016-02-03 18:29:47,10,Emily,Stewart,estewart9@opensource.org,Female,143.28.251.245,3574254110301671,Nigeria,1/28/1997,27234.28,Health Coach IV,


## Create a new transaction

In [7]:
transaction_1 = wr.lakeformation.start_transaction(read_only=False)
transaction_1

'bb9adef5e53945d187463b272ddf8752'

## Write data to S3 linked to governed table

In [8]:
# Governed Table Metadata생성 후 lake location에 지정된 S3에 데이터를 저장함.
wr.s3.to_parquet(
    df=df_female,
    path='s3://governedtable-example/data/',
    dataset=True,
    compression='snappy',
    database='governed_demo',
    table='governed_demo_table',
    parameters={"num_cols":str(len(df_female.columns)),"num_rows":str(len(df_female.index))},
    mode='overwrite',
    table_type='GOVERNED',
    transaction_id=transaction_1)

{'paths': ['s3://governedtable-example/data/f30879104ca94d42a259a3f0f4dfc059.snappy.parquet'],
 'partitions_values': {}}

## Commit Lake formation Transaction

In [9]:
wr.lakeformation.commit_transaction(transaction_1)
transaction = wr.lakeformation.describe_transaction(transaction_1)
transaction


'committed'

In [10]:
df_val = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_val.shape[0]

482

## Insert additional data into governed table(Append)

In [11]:
# 기존 데이터셋에 남성 데이터를 추가하고자 함
df_male = df_raw[df_raw.gender.isin(['Female'])==False]

In [12]:
#남성 데이터를 추가하는 트랜잭션을 선언함
transaction_2 = wr.lakeformation.start_transaction(read_only=False)

In [13]:
#governed_demo_table에 정의된 s3 경로에 해당 데이터를 append 함
wr.s3.to_parquet(
    df=df_male,
    path='s3://governedtable-example/data/',
    dataset=True,
    compression='snappy',
    database='governed_demo',
    table='governed_demo_table',
    parameters={"num_cols":str(len(df_male.columns)),"num_rows":str(len(df_male.index))},
    mode='append',
    table_type='GOVERNED',
    transaction_id=transaction_2)

{'paths': ['s3://governedtable-example/data/212a4b27071441b092385a987fbd754e.snappy.parquet'],
 'partitions_values': {}}

In [14]:
# 트랜잭션 commit전에 카운트를 하면 남성 데이터가 추가되지 않음을 알수 있음
df_val = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_val.shape[0]

482

In [15]:
# 트랜잭션을 commit함
wr.lakeformation.commit_transaction(transaction_2)
transaction = wr.lakeformation.describe_transaction(transaction_2)
transaction

'committed'

In [16]:
## 정상적으로 commit되어 남성데이터까지 카운트가 변경되었음을 확인할 수 있음
df_val = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_val.shape[0]

1000

In [17]:
df_val.sort_values(by='id').head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
518,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
0,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
519,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
520,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
1,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


## Drop unnecessary customer information column from governed table(ipaddress) 

In [18]:
transaction_3 = wr.lakeformation.start_transaction(read_only=False)

In [19]:
# Metadata상 컬럼 삭제는 wrangler delete_column libary로 진행
wr.catalog.delete_column(
    database='governed_demo',
    table='governed_demo_table',
    column_name='ip_address',
    transaction_id = transaction_3
)

In [20]:
wr.lakeformation.commit_transaction(transaction_3)
transaction = wr.lakeformation.describe_transaction(transaction_3)
transaction

'committed'

In [21]:
df_drop = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_drop.sort_values(by='id').head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,cc,country,birthdate,salary,title,comments
518,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
0,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,,Canada,1/16/1968,150280.17,Accountant IV,
519,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
520,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
1,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,5602256255204850.0,South Africa,,,,


## Add column and Insert data to added column(segment)

In [22]:
transaction_4 = wr.lakeformation.start_transaction(read_only=False)

In [23]:
# Metadata상 컬럼 추가는 wrangler add_column libary로 진행
wr.catalog.add_column(
    database='governed_demo',
    table='governed_demo_table',
    column_name='segment',
    column_comment='segment',
    column_type='string',
    transaction_id=transaction_4
)

In [24]:
wr.lakeformation.commit_transaction(transaction_4)
transaction = wr.lakeformation.describe_transaction(transaction_4)
transaction

'committed'

In [25]:
df_add = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_add.sort_values(by='id').head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,cc,country,birthdate,salary,title,comments,segment
518,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0,
0,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,,Canada,1/16/1968,150280.17,Accountant IV,,
519,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,,
520,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,,
1,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,5602256255204850.0,South Africa,,,,,


In [26]:
df_add.loc[df_add.salary > 100000,'segment'] = 'Gold_Class'
df_add.loc[df_add.salary <= 100000,'segment'] = 'Silver_Class'

In [27]:
df_add = df_add.sort_values(by='id')

In [28]:
transaction_5 = wr.lakeformation.start_transaction(read_only=False)

In [29]:
# 컬럼 삭제 및 추가가 metadata상 완료 된 후, datalake location지정된 S3에 데이터 반영함.
wr.s3.to_parquet(
    df=df_add,
    path='s3://governedtable-example/data/',
    dataset=True,
    compression='snappy',
    database='governed_demo',
    table='governed_demo_table',
    parameters={"num_cols":str(len(df_add.columns)),"num_rows":str(len(df_add.index))},
    mode='overwrite',
    table_type='GOVERNED',
    transaction_id=transaction_5)

{'paths': ['s3://governedtable-example/data/e315fa30679848398556988e1ec711a9.snappy.parquet'],
 'partitions_values': {}}

In [30]:
wr.lakeformation.commit_transaction(transaction_5)
transaction = wr.lakeformation.describe_transaction(transaction_5)
transaction

'committed'

In [31]:
df_add = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_add.head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,cc,country,birthdate,salary,title,comments,segment
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0,Silver_Class
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,,Canada,1/16/1968,150280.17,Accountant IV,,Gold_Class
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,,Gold_Class
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,,Silver_Class
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,5602256255204850.0,South Africa,,,,,


## Delete inactive customer from governed table

In [32]:
#test purpose, generate sample customer ID to test deletion function
#there is no direct deletion function. so, deletion will take place with overwrite mode.
from pandas import Series, DataFrame
df_inactive = {'id':[2,3,8,9,13]}
inactive_customer = DataFrame(df_inactive)

In [33]:
df_del = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_del.head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,cc,country,birthdate,salary,title,comments,segment
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0,Silver_Class
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,,Canada,1/16/1968,150280.17,Accountant IV,,Gold_Class
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,,Gold_Class
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,,Silver_Class
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,5602256255204850.0,South Africa,,,,,


In [34]:
#active 고객만 선택함.
df_del = df_del[df_del.id.isin(inactive_customer.id)==False]

In [35]:
transaction_6 = wr.lakeformation.start_transaction(read_only=False)

In [36]:
#과거 transaction file에도 inactive고객이 있기에, 필요에 따라 삭제를 진행함.
#wr.s3.delete_objects('s3://governedtable-example/data/')

In [37]:
#선택된 active고객으로 전체 파일을 overwrite진행함.
wr.s3.to_parquet(
    df=df_del,
    path='s3://governedtable-example/data/',
    dataset=True,
    compression='snappy',
    database='governed_demo',
    table='governed_demo_table',
    parameters={"num_cols":str(len(df_del.columns)),"num_rows":str(len(df_del.index))},
    mode='overwrite',
    table_type='GOVERNED',
    transaction_id=transaction_6)

{'paths': ['s3://governedtable-example/data/4356f1f957534c2d802b1a4f7a5edae2.snappy.parquet'],
 'partitions_values': {}}

In [38]:
wr.lakeformation.commit_transaction(transaction_6)
transaction = wr.lakeformation.describe_transaction(transaction_6)
transaction

'committed'

In [39]:
df_del = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {'governed_demo_table'};",
    database='governed_demo')
df_del.head()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,cc,country,birthdate,salary,title,comments,segment
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,6759521864920116,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0,Silver_Class
1,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,3576031598965625,China,4/8/1997,90263.05,Senior Cost Accountant,,Silver_Class
2,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,5602256255204850,South Africa,,,,,
3,2016-02-03 07:22:34,6,Kathryn,White,kwhite5@google.com,Female,3583136326049310,Indonesia,2/25/1983,69227.11,Account Executive,,Silver_Class
4,2016-02-03 08:33:08,7,Samuel,Holmes,sholmes6@foxnews.com,Male,3582641366974690,Portugal,12/18/1987,14247.62,Senior Financial Analyst,,Silver_Class
