# CloudFront 日志 ETL
这个试验将展示如果对 CloudFront 日志进行 ELT 操作，在开始前我们先从 [CloudFront Log Format](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#LogFileFormat) 中了解日志的字段。

该 ELT 实验有两个目的：
* 为每一条记录增加**国家**和**城市**信息。该任务可以通过查询IP数据库完成，此处我们使用 [IP2Location Lite](https://lite.ip2location.com/) 数据库。 
* 为每一条记录增加**设备品牌**,**操作系统**, **操作系统版本**信息。该任务可以通过解析user-agent字段完成，此处我们使用第三方Python库[user_agents](https://github.com/selwin/python-user-agents)。





In [3]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

In [7]:
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, Row
from pyspark.sql.functions import udf

from ip2location import IP2Location
from user_agents import parse
import boto3

In [5]:
s3 = boto3.resource('s3')
bucket = s3.Bucket('joeshi')
localPath = '/tmp/DB.BIN'
bucket.download_file('PoC/Glue/artifacts/IP2LOCATION-LITE-DB5.IPV6.BIN', localPath)
database = IP2Location.IP2Location()
database.open(localPath)

In [1]:
### 创建解析user-agent字段的UDF

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1548337377007_0002,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [8]:
def chop_ua(ua_string):
    user_agent = parse(ua_string.replace("%2520", " "))
    print(str(user_agent))
    return Row("ua_os_family", "ua_os_version", "ua_device_brand")(user_agent.os.family, user_agent.os.version_string, user_agent.device.brand)


ua_schema = StructType([
    StructField("ua_os_family", StringType(), False),
    StructField("ua_os_version", StringType(), False),
    StructField("ua_device_brand", StringType(), False)
])

chop_ua_udf = udf(chop_ua, ua_schema)

### 创建将 IP String 转化成 Numeric 类型的UDF

In [9]:
def chop_c_ip(c_ip):
    rec = database.get_all(c_ip)
    return Row("country_short",
               "country_long",
               "city",
               "latitude",
               "longitude"
               )(rec.country_short,
                 rec.country_long,
                 rec.city,
                 rec.latitude,
                 rec.longitude)


# 如果是某一个 Field 需要转化成多个 Column，使用 StructField来实现
c_ip_schema = StructType([
    StructField("country_short", StringType(), False),
    StructField("country_long", StringType(), False),
    StructField("city", StringType(), False),
    StructField("latitude", DoubleType(), False),
    StructField("longitude", DoubleType(), False)
])

chop_c_ip_udf = udf(chop_c_ip, c_ip_schema)

### 创建 CloudFront 日志的 DynamicFrame

In [10]:
cf_logs = glueContext.create_dynamic_frame.from_catalog(database="cloudfront_log", table_name="cloudfront")
print "Counts: ", cf_logs.count()
cf_logs.printSchema()

Counts:  171866
root
|-- date: string
|-- time: string
|-- x_edge_location: string
|-- sc_bytes: long
|-- c_ip: string
|-- cs_method: string
|-- cs_host: string
|-- cs_uri_stem: string
|-- sc_status: long
|-- cs_referer: string
|-- cs_user_agent: string
|-- cs_uri_query: string
|-- cs_cookie: string
|-- x_edge_result_type: string
|-- x_edge_request_id: string
|-- x_host_header: string
|-- cs_protocol: string
|-- cs_bytes: long
|-- time_taken: double
|-- x_forwarded_for: string
|-- ssl_protocol: string
|-- ssl_cipher: string
|-- x_edge_response_result_type: string
|-- cs_protocol_version: string
|-- mbps: string

In [14]:
cf_logs_df = cf_logs.toDF()

In [15]:
cf_logs_df = cf_logs_df.withColumn("ua", chop_ua_udf(cf_logs_df.cs_user_agent))\
    .withColumn("c_ip_rec", chop_c_ip_udf(cf_logs_df.c_ip))

In [16]:
cf_logs_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- x_edge_location: string (nullable = true)
 |-- sc_bytes: long (nullable = true)
 |-- c_ip: string (nullable = true)
 |-- cs_method: string (nullable = true)
 |-- cs_host: string (nullable = true)
 |-- cs_uri_stem: string (nullable = true)
 |-- sc_status: long (nullable = true)
 |-- cs_referer: string (nullable = true)
 |-- cs_user_agent: string (nullable = true)
 |-- cs_uri_query: string (nullable = true)
 |-- cs_cookie: string (nullable = true)
 |-- x_edge_result_type: string (nullable = true)
 |-- x_edge_request_id: string (nullable = true)
 |-- x_host_header: string (nullable = true)
 |-- cs_protocol: string (nullable = true)
 |-- cs_bytes: long (nullable = true)
 |-- time_taken: double (nullable = true)
 |-- x_forwarded_for: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- x_edge_response_result_type: string (nullable = true)
 |-- cs_p

In [17]:
cf_logs_df = cf_logs_df.select("x_edge_location", "sc_bytes", "c_ip", "cs_uri_stem", "cs_user_agent", "x_edge_result_type",
                       "time_taken", "x_edge_response_result_type", "ua.*", "c_ip_rec.*")

显示前10条结果

In [18]:
cf_logs_df.show(10);

+---------------+--------+---------------+--------------------+--------------------+------------------+----------+---------------------------+------------+-------------+---------------+-------------+--------------------+--------------------+---------+----------+
|x_edge_location|sc_bytes|           c_ip|         cs_uri_stem|       cs_user_agent|x_edge_result_type|time_taken|x_edge_response_result_type|ua_os_family|ua_os_version|ua_device_brand|country_short|        country_long|                city| latitude| longitude|
+---------------+--------+---------------+--------------------+--------------------+------------------+----------+---------------------------+------------+-------------+---------------+-------------+--------------------+--------------------+---------+----------+
|        LAX3-C2|  343740|187.171.189.171|/assets/Release4/...|Dalvik/2.1.0%2520...|               Hit|     0.001|                        Hit|     Android|          5.1|Generic_Android|           MX|            

### 将清洗完的结果输出

将清洗完的结果输出到 `s3://joeshi/PoC/Glue/job/cloudfront_etl_test/`;

建议使用 **Parquet** 或者 **ORC** 作为输出格式，这种列式存储的文件更适合大数据的查询.

In [None]:
cf_logs = DynamicFrame.fromDF(cf_los_df, glueContext, "cloudfront_parquet")
glueContext.write_dynamic_frame.from_options(frame = output,
                                             connection_type = "s3",
                                             connection_options = {"path": "s3://joeshi/PoC/Glue/job/cloudfront_etl_test/"},
                                             format = "parquet")