forked from xinxi1990/MyMonkey
-
Notifications
You must be signed in to change notification settings - Fork 0
/
GetNetWork.py
136 lines (97 loc) · 3.44 KB
/
GetNetWork.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# coding=utf-8
'''
获取设备流量
上行+下行
模拟和真机获取方式不一样
@author xinxi
'''
from AdbCommon import AdbCommon
import subprocess
from DateBean import DateBean
import logger
import time
total = 0
class GetNetWork():
def __init__(self, dev):
self.dev = dev
self.db = DateBean()
def getnetwork(self,activity):
'''
获取真机的流量
获取上传和下载的流量
:return:
'''
adc = AdbCommon(self.dev)
global total
try:
uid = adc.get_app_uid(self.db.packagename)
# 获取uid
cmd = 'adb -s %s shell cat /proc/uid_stat/%s/tcp_snd' % (self.dev, uid)
# 上传流量
logger.log_debug(cmd)
pipe = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout
updata = int(pipe.read().split('/')[0])
cmd = 'adb -s %s shell cat /proc/uid_stat/%s/tcp_rcv' % (self.dev, uid)
logger.log_debug(cmd)
# 下载流量
pipe = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout
downdata = int(pipe.read().split('/')[0])
total = (format(float(updata + downdata) / float(1024 * 1024), '.3f'))
except Exception, e:
logger.log_error('获取真机流量失败:%s' + str(e))
total = 0
finally:
times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 发生时间
with open(self.db.networkpath, 'ab+') as f:
f.write(
str(times) + ',' +
str(total) + ',' +
str(activity) + ',' + '\n'
)
def simulatorgetnetwork(self,activity):
'''
获取模拟器的流量
获取上传和下载的流量
:return:
'''
global total
adc = AdbCommon(self.dev)
try:
uid = adc.get_app_pid(self.db.packagename)
cmd = 'adb -s %s shell cat /proc/%s/net/dev' % (self.dev, uid)
# 获取流量命令
logger.log_debug(cmd)
pipe = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout
for index in pipe.readlines():
# if index.startswith(' wlan0'): # 真机
if 'eth0' in index: # 模拟器
down = index.split()[1]
# 下载
send = index.split()[9]
# 上传
total = (int(down) + int(send)) / (1024 * 1024)
# 上传和下载
except Exception, e:
logger.log_error('获取模拟器流量失败:%e' + str(e))
total = 0
finally:
times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 发生时间
with open(self.db.networkpath, 'ab+') as f:
f.write(
str(times) + ',' +
str(total) + ',' +
str(activity) + ',' + '\n'
)
def selectnetwork(self,simulator,activity):
'''
选择真机或者模拟器获取流量的方法
:param simulator: 取Path.py中的simulator的参数
:return:
'''
logger.log_info("simulator: " + str(simulator))
if simulator:
self.simulatorgetnetwork(activity)
else:
self.getnetwork(activity)