# Nginx 日志统计分析

In [1]:
# 从 HDFS 上导入 Nginx 日志文件
text_file = sc.textFile("hdfs://master:9000/nginx_log/aa")

In [2]:
# 获取第一行记录
first_line = text_file.first()
first_line

                                                                                

'183.162.52.7 - - [10/Nov/2016:00:01:02 +0800] "POST /api3/getadv HTTP/1.1" 200 813 "www.imooc.com" "-" cid=0&timestamp=1478707261865&uid=2871142&marking=androidbanner&secrect=a6e8e14701ffe9f6063934780d9e2e6d&token=f51e97d1cb1a9caac669ea8acc162b96 "mukewang/5.0.0 (Android 5.1.1; Xiaomi Redmi 3 Build/LMY47V),Network 2G/3G" "-" 10.100.134.244:80 200 0.027 0.027'

In [3]:
# 构建日志记录行匹配正则
import re
pattern = re.compile(r'(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request_method>.*?) (?P<query>.*?)" (?P<status>.*?) (?P<content_length>.*?) "(?P<raw_uri>.*?)" "(?P<ua>.*?)"')

In [4]:
# 查看正则表达式效果
search_result = pattern.search(first_line)

In [5]:
search_result.group('ip')

'183.162.52.7'

In [6]:
search_result.group('time')

'10/Nov/2016:00:01:02 +0800'

In [7]:
search_result.group('request_method')

'POST'

In [8]:
search_result.group('query')

'/api3/getadv HTTP/1.1'

In [9]:
search_result.group('status')

'200'

In [10]:
search_result.group('content_length')

'813'

In [11]:
search_result.group('raw_uri')

'www.imooc.com'

In [12]:
from RowData import RowData

In [13]:
# 构建时间解析格式
import datetime
datetime.datetime.strptime('10/Nov/2016:00:01:02 +0800', '%d/%b/%Y:%H:%M:%S %z')

datetime.datetime(2016, 11, 10, 0, 1, 2, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800)))

In [14]:
# 解析 Nginx 日志时间函数
def parse_nginx_time(time_string):
    return datetime.datetime.strptime(time_string, '%d/%b/%Y:%H:%M:%S %z')

In [15]:
# 导入域名解析库
from tld import get_fld, exceptions

# 解析 Nginx 日志每行记录
def parse_nginx_log_line(logline):
    match = re.search(pattern, logline)
 
    if match is None:
        return None
 
    content_length_field = match.group('content_length')
    
    try:
        domain = get_fld(match.group('raw_uri'), fix_protocol=True)
    except (exceptions.TldDomainNotFound, exceptions.TldBadUrl):
        domain = match.group('raw_uri')
    
    return  RowData(
                ip = match.group('ip'),
                time = parse_nginx_time(match.group('time')),
                request_method = match.group('request_method'),
                query = match.group('query'),
                raw_uri = match.group('raw_uri'),
                domain = domain,
                status = match.group('status'),
                content_length = 0 if content_length_field == '-' else content_length_field
            )

In [16]:
# 测试解析效果
row_data = parse_nginx_log_line(first_line)
row_data

ip: 183.162.52.7
time: 2016-11-10 00:01:02+08:00
request_method: POST
query: /api3/getadv HTTP/1.1
raw_uri: www.imooc.com
domain: imooc.com
status: 200
content_length: 813

In [17]:
row_data.test_state()

{'ip': '183.162.52.7',
 'time': datetime.datetime(2016, 11, 10, 0, 1, 2, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))),
 'request_method': 'POST',
 'query': '/api3/getadv HTTP/1.1',
 'raw_uri': 'www.imooc.com',
 'domain': 'imooc.com',
 'status': '200',
 'content_length': '813'}

In [18]:
import pickle

pickle_dump_result = pickle.dumps(row_data)

In [19]:
pickle.loads(pickle_dump_result)

ip: 183.162.52.7
time: 2016-11-10 00:01:02+08:00
request_method: POST
query: /api3/getadv HTTP/1.1
raw_uri: www.imooc.com
domain: imooc.com
status: 200
content_length: 813

In [20]:
# 解析每一行记录
map_result = text_file.map(parse_nginx_log_line)

In [33]:
map_result.first()

ip: 183.162.52.7
time: 2016-11-10 00:01:02+08:00
request_method: POST
query: /api3/getadv HTTP/1.1
raw_uri: www.imooc.com
domain: imooc.com
status: 200
content_length: 813

### 指定域名请求数量统计

In [22]:
# 对 map 结果 RDD 进行筛选
filter_result = map_result.filter(lambda x: x is not None and x.domain == "imooc.com")

In [23]:
# 计算筛选后 RDD 记录数量
# 指定域名的请求数量
filter_result.count()

                                                                                

39543

### 指定日期请求成功数量占比

In [24]:
# 指定日期
filter_date = datetime.date(year=2016, month=11, day=10)

In [25]:
# 对 map 结果进行筛选，并按状态码分组
group_result = (
    map_result
    .filter(lambda x: x is not None and x.time.date() == filter_date)
    .groupBy(lambda x: 200 <= int(x.status) < 300)
)

In [26]:
collect_result = group_result.collect()

                                                                                

In [27]:
collect_result

[(False, <pyspark.resultiterable.ResultIterable at 0x7f42ec0ff9a0>),
 (True, <pyspark.resultiterable.ResultIterable at 0x7f42ec410910>)]

In [28]:
def get_count(data_list):
    return len(data_list)

In [29]:
collect_result = group_result.mapValues(get_count).collect()

In [30]:
# 请求状态数量
success_count = collect_result[1][1]
fail_count = collect_result[0][1]
success_count, fail_count

(45045, 54955)

In [31]:
# 请求成功占比
success_count / (success_count + fail_count) * 100

45.045