In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import from_unixtime,col


args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

db_name = 'db'
tbl_name = 'table1'
dynamic_frame_a = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name, transformation_ctx = "df_a")

# Data Cleansing: When the value of timestamp_ap is like '1548883419015', convert it to normal timestamp formation.
df_a = dynamic_frame_a.toDF()
df_a.select(from_unixtime(col('timestamp_na')/1000)).head()
df_a.select(from_unixtime(col('timestamp_ap')/1000)).head()
df_a_tmp = df_a\
    .withColumn('converted_timestamp_na' , from_unixtime(col('timestamp_na')/1000))\
    .withColumn('converted_timestamp_ap', from_unixtime(col('timestamp_ap')/1000))

dyf_cleansing = DynamicFrame.fromDF(df_a_tmp,glueContext,'test')
datasink0 = glueContext.write_dynamic_frame.from_options(\
                                                         frame = dyf_cleansing,\
                                                         connection_type = "s3",\
                                                         connection_options = {"path": "s3a://dev/cleasing_etl/"},\
                                                         format = "parquet", transformation_ctx = "datasink0")
job.commit()