# AWS Glue Studio Notebook
##### This is a Glue Studio notebook for performing second-round preprocessing


### install kiwipiepy library

In [2]:
%additional_python_modules kiwipiepy==0.15.2

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Additional python modules to be included:
kiwipiepy==0.15.2


###  Run this cell to set up and start your interactive session.


In [2]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 20

# library for setting glue studio notebook
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# library for conducting 2nd preprocessing
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from botocore.exceptions import NoCredentialsError
from botocore.exceptions import ClientError
from datetime import date
from kiwipiepy import Kiwi
import boto3
import json

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session fd37e023-017b-4f51-9b74-856950035af2.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session fd37e023-017b-4f51-9b74-856950035af2.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 3.0


You are already connected to a glueetl session fd37e023-017b-4f51-9b74-856950035af2.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session fd37e023-017b-4f51-9b74-856950035af2.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5
ModuleNotFoundError: No module named 'kiwipiepy'


### setting S3 uploader class

In [2]:
class S3Uploader:
    def __init__(self, bucket_name, access_key, secret_key, region_name):
        self.bucket_name = bucket_name
        self.access_key = access_key
        self.secret_key = secret_key
        self.s3 = boto3.client(
            's3',
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            region_name=region_name
        )

    def get_upload_file_path(self):
        """
        1차 전처리된 최종 parquet 파일을 s3에 업로드할 경로를 리턴합니다.
        """
        today = date.today()
        year = str(today.year)
        month = str(today.month).zfill(2)
        day = str(today.day).zfill(2)

        return f's3://{self.bucket_name}/2nd_processed_data/year={year}/month={month}/day={day}'




In [3]:
def get_secret():
    """
    AWS Secrets Manager를 이용해 환경변수를 불러옵니다.
    """
    secret_name = "prod/de-1-1/back-end"
    REGION_NAME = "ap-northeast-2"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=REGION_NAME
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e

    secret = get_secret_value_response['SecretString']
    secret_dict = json.loads(secret)

    BUCKET_NAME = secret_dict['BUCKET_NAME']
    ACCESS_KEY = secret_dict['AWS_ACCESS_KEY_ID']
    SECRET_KEY = secret_dict['AWS_SECRET_ACCESS_KEY']
    KAKAO_API_TOKEN = secret_dict['KAKAO_API_TOKEN']

    return BUCKET_NAME, ACCESS_KEY, SECRET_KEY, REGION_NAME, KAKAO_API_TOKEN




### Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [11]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='de1_1_database', table_name='de1_1_1st_cleaned_data')
dyf.printSchema()

root
|-- job_id: long
|-- platform: string
|-- category: string
|-- company: string
|-- title: string
|-- preferred: string
|-- required: string
|-- primary_responsibility: string
|-- url: string
|-- end_at: string
|-- skills: array
|    |-- element: string
|-- location: string
|-- welfare: string
|-- body: int
|-- company_description: string
|-- coordinate: int
|-- year: string
|-- month: string
|-- day: string


### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [1]:
df = dyf.toDF()
df.show()

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::862327261051:role/DE1_1_Glue_Role
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: fd37e023-017b-4f51-9b74-856950035af2
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session fd37e023-017b-4f51-9b74-856950035af2 to get into ready status...
Session fd37e023-017b-4f51-9b74-856950035af2 has been created.
NameError: name 'dyf' is not defined


### Drop Columns year, month, day

In [None]:
drop_cols = ("year", "month", "day")
df = df.drop(*drop_cols)
df.printSchema()
df.show()

### Fucntions for extracting nouns

In [7]:
@udf(returnType=ArrayType(StringType()))
def extract_korean_noun(text):
    if text is None or text.strip() == "":
        return []
    kiwi = Kiwi()
    result = kiwi.tokenize(text)
    return [token.form for token in result if token.tag in {'NNG', 'NNP'}]

NameError: name 'udf' is not defined


In [9]:
@udf(returnType=ArrayType(StringType()))
def extract_english_noun(text):
    if text is None or text.strip() == "":
        return []
    kiwi = Kiwi()
    result = kiwi.tokenize(text)
    return [token.form for token in result if token.tag == 'SL']

NameError: name 'udf' is not defined


In [10]:
df = df.withColumn("preferred_korean_nouns", extract_korean_noun(df["preferred"]))
df = df.withColumn("required_korean_nouns", extract_korean_noun(df["required"]))
df = df.withColumn("primary_responsibility_korean_nouns", extract_korean_noun(df["primary_responsibility"]))
df = df.withColumn("welfare_korean_nouns", extract_korean_noun(df["welfare"]))
df = df.withColumn("preferred_english_nouns", extract_english_noun(df["preferred"]))
df = df.withColumn("required_english_nouns", extract_english_noun(df["required"]))
df = df.withColumn("primary_responsibility_english_nouns", extract_english_noun(df["primary_responsibility"]))
df = df.withColumn("welfare_english_nouns", extract_english_noun(df["welfare"]))

NameError: name 'df' is not defined


In [None]:
df.show()

In [None]:
repartitioned_df = df.repartition(1)

In [15]:
BUCKET_NAME, ACCESS_KEY, SECRET_KEY, REGION_NAME, _ = get_secret()

NameError: name 'boto3' is not defined


In [None]:
uploader = S3Uploader(BUCKET_NAME, ACCESS_KEY, SECRET_KEY, REGION_NAME)
upload_file_path = uploader.get_upload_file_path()
repartitioned_df.write.parquet(upload_file_path, mode="overwrite")

In [None]:
job.commit()
sc.stop()