-
Notifications
You must be signed in to change notification settings - Fork 0
/
logusers.py
459 lines (380 loc) · 15.9 KB
/
logusers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
#!/usr/bin/env python3
#
# Log which users are on which servers at what times.
#
# Copyright (c) Drew Heintz 2016
#
import argparse
from datetime import datetime, timedelta
import json
import multiprocessing
import os
import os.path
import queue
import re
import sched
import signal
import sqlite3
import subprocess
import threading
import time
# Default name of the config file
DEFAULT_CONFIG_FILENAME = 'config.json'
# Amount of time to use when the user didn't log out on their own
DEFAULT_DURATION = timedelta(minutes=0)
# Table name where records are stored in the database
RECORD_TABLE = 'serverdata'
# The Unix epoch, from which all times are referenced
EPOCH = datetime.utcfromtimestamp(0)
# The number of seconds in a day
SECONDS_PER_DAY = 60 * 60 * 24
# Minimum number of threads to use for collecting data
MIN_COLLECTION_THREADS = 2
# File where we will cache the host keys
HOST_KEY_FILE = os.path.abspath('./host_cache')
# Command to send to the server
SERVER_COMMAND = ['last', '-wF']
# Regex to match lines from `last`
# eg. reboot system boot 0.0.0.0 Fri Feb 12 14:21:34 2016 - Thu Feb 18 04:57:16 2016 (5+14:35)
# eg. msm7155 pts/12 72.230.234.95 Wed Feb 17 19:53:48 2016 - Wed Feb 17 19:57:56 2016 (00:04)
# eg. ach3628 pts/12 129.21.82.30 Thu Feb 18 04:46:15 2016 still logged in
LAST_REGEX = re.compile(
r'([^\s]+) .+? ([0-9a-z:\.]+) +([A-Za-z0-9: ]+) \-? ([A-Za-z0-9: ]+)(?:\(([0-9]\+)?([0-9]{2}\:[0-9]{2})\))?')
# Regex to check for users who are logged in locally.
# eg: :0, :5
LOCAL_LOGIN_REGEX = re.compile(r'\:[0-9]+')
# Query used for creating the database
QUERY_DATABASE_INIT = """CREATE TABLE IF NOT EXISTS '%s' (
id INTEGER PRIMARY KEY, hostname VARCHAR(255), username VARCHAR(255),
source VARCHAR(255), login DATETIME, duration UNSIGNED BIGINT)""" % RECORD_TABLE
# Query to find the most recent query from each hostname
QUERY_SELECT_MOST_RECENT = """SELECT hostname, MAX(login) login FROM {0}
GROUP BY hostname""".format(RECORD_TABLE)
# Query to insert tuples into the database
QUERY_INSERT_TUPLES = 'INSERT INTO {0} VALUES (?,?,?,?,?,?)'\
.format(RECORD_TABLE)
# The SSH command. Use what seems like a good path on Windows and just the
# 'ssh' command on anything else. This can be set with a command-line arg.
if os.name == 'nt':
SSH_COMMAND = 'C:/Program Files/Git/usr/bin/ssh'
else:
SSH_COMMAND = 'ssh'
def test_none(*args):
"""Returns true if any of the arguments are None."""
for x in args:
if x is None:
return True
return False
class Record:
"""A recorded login.
"""
def __init__(self, hostname, username, source, login, duration):
"""Construct a Record.
:param hostname: (str) hostname of the server where data was colleted
:param username: (str) username of the user who logged in
:param source: (str) ip or hostname from which they logged in
:param login: (datetime) when they logged in
:param duration: (timedelta) duration of their stay
"""
self.hostname = hostname
self.username = username
self.source = source
self.login = login
self.duration = duration
def to_tuple(self):
"""Create a tuple suitable for writing to an SQL database.
"""
duration_seconds = self.duration.days * SECONDS_PER_DAY \
+ self.duration.seconds
# The None is for the primary key
return (None, self.hostname, self.username,
self.source, self.login, duration_seconds)
@staticmethod
def from_tuple(tup):
"""Read a record from a tuple generated by to_tuple().
"""
login = Record.parse_sql_datetime(tup[4])
days = tup[5] / SECONDS_PER_DAY
seconds = tup[5] % SECONDS_PER_DAY
return Record(tup[1], tup[2], tup[3], login,
timedelta(days=days, seconds=seconds))
@staticmethod
def parse_sql_datetime(text):
return datetime.strptime(text, '%Y-%m-%d %H:%M:%S')
@staticmethod
def parse_line(line, hostname):
"""Parse a line of text from the server into a Record.
Note that some 'invalid' records such as those with only a start time
will still be parsed. These will then have a duration of 0 indicating
they were not fully valid.
:param line: the line of text received from the server
:return: a newly created Record if valid, None if invalid
"""
match = LAST_REGEX.match(line)
if match:
data = match.groups()
# Make sure everything has some value
if test_none(data[0], data[1], data[2], data[3]):
return None
username = data[0]
source = data[1]
if LOCAL_LOGIN_REGEX.match(source):
# Set the source to represent a local gui
source = '0.0.0.0'
login = Record.parse_datetime(data[2].strip())
if login is None:
return None
data_3 = data[3].strip()
logout_time = Record.parse_datetime(data_3)
# If the logout time was invalid then ignore the entry
if logout_time is None:
if data_3 == 'still logged in':
return None
else:
# If this is not the case we will fall back on the default
# duration to provide us a time (which is 0)
logout_time = login + DEFAULT_DURATION
duration = logout_time - login
return Record(hostname, username, source, login, duration)
else:
# Well that failed. Return None
return None
@staticmethod
def is_valid_line(line):
"""Test if a line is fully valid.
This means start time and end time must both be present.
:param line: line to check
"""
match = LAST_REGEX.match(line)
if match:
data = match.groups()
# ignore data[0] and data[1] because those are not worth validating
login = Record.parse_datetime(data[2].strip())
logout_time = Record.parse_datetime(data[3].strip())
# ignore data[4] because it's optional
duration = data[5]
return (login is not None
and logout_time is not None
and duration is not None)
else:
return False
@staticmethod
def parse_datetime(text):
"""Turn a date string from `last` into an actual datetime object.
eg. Wed Feb 17 19:53:48 2016
:param text: date time string
"""
try:
return datetime.strptime(text, '%a %b %d %H:%M:%S %Y')
except ValueError:
return None
class Server:
"""A server from which recorder will gather information.
"""
def __init__(self, hostname, username, keyfile):
self.hostname = hostname
self.username = username
self.keyfile = keyfile
def gather_info(self):
"""Gather the last login information from this server.
"""
# We are using SSH and a private key to connect to the server and
# run SERVER_COMMAND. We then put the output into a string and return.
user_host = self.username + '@' + self.hostname
ssh_cmd = [SSH_COMMAND, '-o', 'StrictHostKeyChecking no',
'-o', 'UserKnownHostsFile %s' % HOST_KEY_FILE, '-i',
self.keyfile, user_host]
ssh_cmd.extend(SERVER_COMMAND)
with subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE) as proc:
results = proc.stdout.read().decode('utf-8')
return results
class Recorder:
def __init__(self, config_fd):
"""Construct a new Recorder.
:param config_fd: a readable File-like object for the config file
"""
# Load the JSON config. We don't do any validation right now
# but validation will happen as the program runs.
self.config = json.load(config_fd)
# Build the list of server objects
self.server_list = self._build_server_list()
# Create the SQLite database connection
self.conn = sqlite3.connect(self.config['database'])
self._init_database()
# Create the scheduler
self.timer = sched.scheduler()
# Disable collecting data
self.collecting = False
# sched task handle so we can cancel it later
self.collection_task = None
# Blocking queue used for queueing the servers to be collected
self.collection_queue = queue.Queue()
# Queue used to return lists of tuples to be added to the database
self.sql_insert_queue = queue.Queue()
# List of threads used to process the collection_queue
self.collection_threads = None
def run(self):
"""Run the recorder."""
if not self.collecting:
self.collecting = True
self.collection_threads = self._create_collection_threads()
self.collect()
while True:
# Run the scheduler in blocking mode
self.timer.run(True)
def collect(self):
"""Run the collector.
:param self: A reference to the current object.
"""
# Schedule the task to run again. If we do this at the end of the
# method the timer will basically drift by however long it takes to
# collect the data.
if self.collecting:
self._schedule_collection()
# Get the start time
start_time = time.time()
# Get the list of most recent records for all the servers
most_recent_times = self._most_recent_records()
# Gather info from all the servers
for server in self.server_list:
# Select the most recent time, default is EPOCH
most_recent = most_recent_times.get(server.hostname, EPOCH)
self.collection_queue.put_nowait((server, most_recent))
# Wait for all servers to be processed
self.collection_queue.join()
# Process everything in sql_insert_queue
c = self.conn.cursor()
while not self.sql_insert_queue.empty():
tuple_list = self.sql_insert_queue.get()
c.executemany(QUERY_INSERT_TUPLES, tuple_list)
self.sql_insert_queue.task_done()
# And commit the changes to the SQL database
self.conn.commit()
time_taken = '{:.4f}'.format(time.time() - start_time)
print('Collected data', time_taken, 'seconds', flush=True)
def collection_worker(self):
"""Worker thread function.
Takes a server and most recent record from the collection_queue,
gets the information from the server and processes it.
Exits when it gets a tuple of (None, None) in the queue.
"""
while True:
# List of records
record_list = []
# This call is blocking
server, most_recent = self.collection_queue.get()
# The exit condition
if server is None and most_recent is None:
break
data = server.gather_info()
# Process the data gathered from the server
for line in data.splitlines():
# Get the record
rec = Record.parse_line(line, server.hostname)
# Only add it if it's newer than the most recent record
# from this server.
if rec is not None and rec.login > most_recent:
record_list.append(rec)
# Now insert those records into the database again
# But to do that we need to create a list of tuples
tuple_list = []
for record in record_list:
tuple_list.append(record.to_tuple())
# Send the data back to the main thread to be added to the database
self.sql_insert_queue.put_nowait(tuple_list)
# Tell the collection queue we're done with this task
self.collection_queue.task_done()
def stop(self):
"""Stop collecting data."""
# Check to see if we are currently collecting
if not self.collecting:
return
# Cancel the repeating task
self.collecting = False
self.timer.cancel(self.collection_task)
self.collection_task = None
# Now end the worker threads by pushing (None, None) one to the
# queue for each worker thread to process.
self.collection_queue.join()
for i in range(len(self.collection_threads)):
self.collection_queue.put_nowait((None, None))
# Make sure they are all stopped
for thread in self.collection_threads:
thread.join()
# Stop referencing them
self.collection_threads = None
def _most_recent_records(self):
"""Get the most recent record for each hostname.
:return dict: dictionary of hostnames to login datetime objects
"""
c = self.conn.cursor()
c.execute(QUERY_SELECT_MOST_RECENT)
records = c.fetchall()
result = {}
for record in records:
result[record[0]] = Record.parse_sql_datetime(record[1])
return result
def _schedule_collection(self):
"""Schedule the collection task to be run."""
self.collection_task = \
self.timer.enter(int(self.config['period']),
1, self.collect)
def _create_collection_threads(self):
"""Build an array of worker threads."""
try:
# Get the number of cpus but make sure it meets minimum reqirements
thread_count = multiprocessing.cpu_count()
thread_count = max(thread_count, MIN_COLLECTION_THREADS)
except NotImplementedError:
thread_count = MIN_COLLECTION_THREADS
print('Using', thread_count, 'threads')
threads = []
for i in range(thread_count):
t = threading.Thread(target=self.collection_worker)
t.start()
threads.append(t)
return threads
def _build_server_list(self):
"""Build the server objects from the server list."""
server_list = []
default_username = self.config['username']
default_keyfile = self.config['keyfile']
for server in self.config['servers']:
if isinstance(server, str):
srv = Server(server, default_username, default_keyfile)
elif isinstance(server, dict):
# If the server object is a dict then override defaults
srv = Server(
server['host'],
server['username'] or default_username,
server['keyfile'] or default_keyfile)
else:
raise Exception(
'Failed to read server name. Expected dict or str.')
server_list.append(srv)
return server_list
def _init_database(self):
"""Create the database table and perform any other necessary setup."""
c = self.conn.cursor()
c.execute(QUERY_DATABASE_INIT)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Collect login/logout data for groups of servers')
parser.add_argument('-c', metavar='config_file', type=str,
default=DEFAULT_CONFIG_FILENAME,
help='Change the config file')
parser.add_argument('-s', metavar='ssh_command', type=str,
default=SSH_COMMAND, help='Command to run ssh')
args = parser.parse_args()
# Set the SSH_COMMAND to its new value
SSH_COMMAND = args.s
with open(args.c, 'r') as config_fd:
recorder = Recorder(config_fd)
def handler(signum, frame):
print('Stopping...', flush=True)
recorder.stop()
exit()
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
print('Running...', flush=True)
recorder.run()