In [1]:
##创建环境

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("NoteFrame") \
        .getOrCreate()

sc = spark.sparkContext

## 数据说明

**http.log：** 用户访问网站所产生的日志。日志格式为：时间戳、IP地址、访问网址、访问数据、浏览器信息等，样例如下：

In [2]:
!head -3 data/http.log






**ip.dat：**ip段数据，记录着一些ip段范围对应的位置，总量大概在11万条，数据量也算很小的，样例如下

In [3]:
!head -3 data/ip.dat






## 任务描述
**将 http.log 文件中的 ip 转换为地址。如将 122.228.96.111 转为温州，并统计各城市的总访问量**

## 定义一些辅助函数

In [4]:
#ip转化为int类型
def ip2int(s):
    new_str = s.split(".")
    ipNum = 0
    for i in range(len(new_str)):
        ipNum = int(new_str[i]) | ipNum << 8
    return ipNum

In [5]:
print(ip2int('192.168.220.21'))

3232291861


In [6]:
#二分法匹配ip规则
def binary(_list, ip):
    left = 0   # 列表的起始索引
    right = len(_list) - 1   # 列表的结束索引
    mid = int((left + right)/2)  # 采用此方法，通过四舍五入刚好可以定位到列表的中间位置
    while left <= right:
        mid = int((right + left)/2)
        if _list[mid][0] <= ip  and _list[mid][1] >= ip:
            return mid
        elif _list[mid][0] > ip:
            right = mid -1
        else:
            left = mid+1
    return -1

## ip转换并统计各城市的总访问量

In [7]:
#提取日志中的ip地址
rddlog = sc.textFile("data/http.log").map(lambda x:x.split("|")[1])
rddlog.collect()

['125.213.100.123',
 '117.101.215.133',
 '117.101.222.68',
 '115.120.36.118',
 '123.197.64.247',
 '222.55.57.176',
 '123.197.66.93',
 '115.120.12.157',
 '115.120.7.240',
 '117.101.219.241',
 '123.197.49.171',
 '117.101.213.104',
 '115.120.10.205',
 '117.101.218.147',
 '115.120.17.80',
 '117.101.220.175',
 '123.197.66.12',
 '125.213.100.236',
 '123.197.66.208',
 '115.120.19.122',
 '115.120.2.192',
 '117.101.204.182',
 '117.75.230.192',
 '123.197.43.186',
 '115.120.2.74',
 '115.120.11.195',
 '115.120.14.96',
 '123.197.51.93',
 '115.120.14.202',
 '115.120.8.144',
 '115.120.13.233',
 '123.197.64.145',
 '117.101.220.148',
 '117.101.227.38',
 '123.197.67.154',
 '117.101.227.132',
 '123.197.46.211',
 '115.120.3.48',
 '125.213.100.123',
 '115.120.32.40',
 '125.213.96.126',
 '123.197.46.186',
 '117.101.215.9',
 '117.101.215.15',
 '115.120.31.94',
 '123.197.53.151',
 '115.120.7.166',
 '117.101.213.218',
 '122.73.107.206',
 '117.75.230.248',
 '222.55.70.143',
 '115.120.18.28',
 '123.197.72.249',


In [8]:
#将ip范围与对应城市整合
rddip = sc.textFile("data/ip.dat").map(
    lambda x:(x.split("|")[0],
    x.split("|")[1],
    x.split("|")[7]))
    
rddip.collect()

[('1.0.1.0', '1.0.3.255', '福州'),
 ('1.0.8.0', '1.0.15.255', '广州'),
 ('1.0.32.0', '1.0.63.255', '广州'),
 ('1.1.0.0', '1.1.0.255', '福州'),
 ('1.1.2.0', '1.1.7.255', '福州'),
 ('1.1.8.0', '1.1.63.255', '广州'),
 ('1.2.0.0', '1.2.1.255', '福州'),
 ('1.2.2.0', '1.2.2.255', '北京'),
 ('1.2.4.0', '1.2.4.255', '北京'),
 ('1.2.5.0', '1.2.7.255', '福州'),
 ('1.2.8.0', '1.2.8.255', '北京'),
 ('1.2.9.0', '1.2.127.255', '广州'),
 ('1.3.0.0', '1.3.255.255', '广州'),
 ('1.4.1.0', '1.4.3.255', '福州'),
 ('1.4.4.0', '1.4.4.255', '北京'),
 ('1.4.5.0', '1.4.7.255', '福州'),
 ('1.4.8.0', '1.4.127.255', '广州'),
 ('1.8.0.0', '1.8.255.255', '北京'),
 ('1.10.0.0', '1.10.7.255', '广州'),
 ('1.10.8.0', '1.10.9.255', '福州'),
 ('1.10.11.0', '1.10.15.255', '福州'),
 ('1.10.16.0', '1.10.127.255', '广州'),
 ('1.12.0.0', '1.12.255.255', '北京'),
 ('1.13.0.0', '1.13.71.255', '长春'),
 ('1.13.72.0', '1.13.87.255', '吉林'),
 ('1.13.88.0', '1.13.95.255', '长春'),
 ('1.13.96.0', '1.13.127.255', '天津'),
 ('1.13.128.0', '1.13.191.255', '长春'),
 ('1.13.192.0', '1.14.95.

In [9]:
#定义广播变量
brIP = sc.broadcast(rddip.map(lambda x: (ip2int(x[0]), ip2int(x[1]), x[2])).collect())

In [10]:
brIP

<pyspark.broadcast.Broadcast at 0xfffcb407be10>

In [11]:
def getcity(x):
    index = binary(brIP.value, ip2int(x))
    if index != -1:
        return brIP.value[index][2]
    else:
        return 'NULL'

In [13]:
#关联后的结果rdd
from operator import add
rddlog.map(lambda x: getcity(x)).map(lambda x:(x,1)).reduceByKey(add).map(lambda x: {"城市":x[0],"访问量":x[1]}).saveAsTextFile("data/work/new_output_ips")

In [20]:
!head -n 5 data/work/new_output_ips/part-00000

{'城市': '重庆', '访问量': 868}
{'城市': '北京', '访问量': 1535}
{'城市': '西安', '访问量': 1824}
{'城市': '石家庄', '访问量': 383}
{'城市': '昆明', '访问量': 126}
