-
Notifications
You must be signed in to change notification settings - Fork 27
/
lelogconverter.py
54 lines (40 loc) · 5.41 KB
/
lelogconverter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
##################################################################################################################
# VIEWER REQUEST LAMBDA@EDGE LOGS - Conversion from JSon to Parquet format partitioned by year, month, date hour #
##################################################################################################################
## Create dyanmaic frame from raw(Json format) viewer request Lambda@Edge logs as the datasource. Glue Data Catalog = {database = "reInvent2018_aws_service_logs", table_name = "le_log_raw_viewer_request"}
viewerRequestLELog = glueContext.create_dynamic_frame.from_catalog(database = "reInvent2018_aws_service_logs", table_name = "le_log_raw_viewer_request", transformation_ctx = "viewerRequestLELog")
## Map the viewer request Lambda@Edge logs to target format
mappedViewerRequestLELog = ApplyMapping.apply(frame = viewerRequestLELog, mappings = [("executionregion", "string", "executionregion", "string"), ("requestid", "string", "requestid", "string"), ("distributionid", "string", "distributionid", "string"), ("distributionname", "string", "distributionname", "string"), ("eventtype", "string", "eventtype", "string"), ("requestdata", "string", "requestdata", "string"), ("customtraceid", "string", "customtraceid", "string"), ("useragentstring", "string", "useragentstring", "string"), ("partition_0", "string", "year", "string"), ("partition_1", "string", "month", "string"), ("partition_2", "string", "date", "string"), ("partition_3", "string", "hour", "string")], transformation_ctx = "mappedViewerRequestLELog")
## Resolves a choice type within a DynamicFrame
resolvedViewerRequestLELog = ResolveChoice.apply(frame = mappedViewerRequestLELog, choice = "make_struct", transformation_ctx = "resolvedViewerRequestLELog")
## Drops all null fields in a DynamicFrame whose type is NullType
cleanedViewerRequestLELog = DropNullFields.apply(frame = resolvedViewerRequestLELog, transformation_ctx = "cleanedViewerRequestLELog")
## Write the viewer request Lambda@Edge logs to the S3 path(s3://cf-log-bucket-lab/converted/lelogs/viewer-request) in the optimized (Parquet) format partitioned by year, month, date hour
viewerRequestLELogSink = glueContext.write_dynamic_frame.from_options(frame = cleanedViewerRequestLELog, connection_type = "s3", connection_options = {"path": "s3://us-east-1.data-analytics/cflogworkshop/optimized/lelogs/viewer-request", "partitionKeys": ["year", "month", "date", "hour"]}, format = "parquet", transformation_ctx = "viewerRequestLELogSink")
##################################################################################################################
# ORIGIN REQUEST LAMBDA@EDGE LOGS - Conversion from JSon to Parquet format partitioned by year, month, date hour #
##################################################################################################################
## Create dyanmaic frame from raw(Json format) origin request Lambda@Edge logs as the datasource. Glue Data Catalog = {database = "reInvent2018_aws_service_logs", table_name = "le_log_raw_origin_request"}
originRequestLELog = glueContext.create_dynamic_frame.from_catalog(database = "reInvent2018_aws_service_logs", table_name = "le_log_raw_origin_request", transformation_ctx = "originRequestLELog")
## Map the origin request Lambda@Edge logs to target format
mappedOriginRequestLELog = ApplyMapping.apply(frame = originRequestLELog, mappings = [("executionregion", "string", "executionregion", "string"), ("requestid", "string", "requestid", "string"), ("distributionid", "string", "distributionid", "string"), ("distributionname", "string", "distributionname", "string"), ("eventtype", "string", "eventtype", "string"), ("requestdata", "string", "requestdata", "string"), ("viewercountry", "string", "viewercountry", "string"), ("deviceformfactor", "string", "deviceformfactor", "string"), ("customtraceid", "string", "customtraceid", "string"), ("partition_0", "string", "year", "string"), ("partition_1", "string", "month", "string"), ("partition_2", "string", "date", "string"), ("partition_3", "string", "hour", "string")], transformation_ctx = "mappedOriginRequestLELog")
## Resolves a choice type within a DynamicFrame
resolvedOriginRequestLELog = ResolveChoice.apply(frame = mappedOriginRequestLELog, choice = "make_struct", transformation_ctx = "resolvedOriginRequestLELog")
## Drops all null fields in a DynamicFrame whose type is NullType
cleanedOriginRequestLELog = DropNullFields.apply(frame = resolvedOriginRequestLELog, transformation_ctx = "cleanedOriginRequestLELog")
## Write the origin request Lambda@Edge logs to the S3 path(s3://cf-log-bucket-lab/converted/lelogs/origin-request) in the optimized (Parquet) format partitioned by year, month, date hour
originRequestLELogSink = glueContext.write_dynamic_frame.from_options(frame = cleanedOriginRequestLELog, connection_type = "s3", connection_options = {"path": "s3://us-east-1.data-analytics/cflogworkshop/optimized/lelogs/origin-request", "partitionKeys": ["year", "month", "date", "hour"]}, format = "parquet", transformation_ctx = "originRequestLELogSink")
job.commit()