### Setup Spark enviroment

In [1]:
import os
import sys
spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9-src.zip")

import findspark
findspark.init()
import pyspark

In [2]:
number_cores = 4
memory_gb = 4
conf = (pyspark.SparkConf().setMaster('local[{}]'.format(number_cores)).set('spark.driver.memory', '{}g'.format(memory_gb)))
sc = pyspark.SparkContext(conf=conf)

### Load Data

In [3]:
#Load Data
textFile = sc.textFile("C://Users//samue//Downloads//BigData//auth_log//*")

In [4]:

textFile.count()

271071

### Make Sure I Have the Data

In [6]:
textFile.take(100)

['Dec 13 00:06:16 submitty sshd[19509]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root',
 'Dec 13 00:06:17 submitty sshd[19509]: Failed password for root from 218.92.0.184 port 3753 ssh2',
 'Dec 13 00:06:21 submitty sshd[19509]: Failed password for root from 218.92.0.184 port 3753 ssh2',
 'Dec 13 00:07:01 submitty CRON[19523]: pam_unix(cron:session): session opened for user submitty_daemon by (uid=0)',
 'Dec 13 00:07:02 submitty CRON[19523]: pam_unix(cron:session): session closed for user submitty_daemon',
 'Dec 13 00:08:01 submitty CRON[19536]: pam_unix(cron:session): session opened for user submitty_daemon by (uid=0)',
 'Dec 13 00:08:01 submitty CRON[19536]: pam_unix(cron:session): session closed for user submitty_daemon',
 'Dec 13 00:08:29 submitty sshd[19552]: Invalid user shalini from 27.128.173.81 port 49454',
 'Dec 13 00:08:29 submitty sshd[19552]: pam_unix(sshd:auth): check pass; user unknown',
 'Dec 13 00:08:29 s

### All of the fails seem to have command sshd so I filtered it to have just those commands

In [9]:
fails = textFile.filter(lambda line: line.find("sshd") > 0)
fails.take(100)

['Dec 13 00:06:16 submitty sshd[19509]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root',
 'Dec 13 00:06:17 submitty sshd[19509]: Failed password for root from 218.92.0.184 port 3753 ssh2',
 'Dec 13 00:06:21 submitty sshd[19509]: Failed password for root from 218.92.0.184 port 3753 ssh2',
 'Dec 13 00:08:29 submitty sshd[19552]: Invalid user shalini from 27.128.173.81 port 49454',
 'Dec 13 00:08:29 submitty sshd[19552]: pam_unix(sshd:auth): check pass; user unknown',
 'Dec 13 00:08:29 submitty sshd[19552]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=27.128.173.81',
 'Dec 13 00:08:32 submitty sshd[19552]: Failed password for invalid user shalini from 27.128.173.81 port 49454 ssh2',
 'Dec 13 00:08:40 submitty sshd[19560]: Invalid user user5 from 134.175.17.32 port 44532',
 'Dec 13 00:08:40 submitty sshd[19560]: pam_unix(sshd:auth): check pass; user unknown',
 'Dec 13 00:08:40 submit

### Filter out the root fails

In [10]:
rootFails = fails.filter(lambda line: line.find("root") > 0)
rootFails.take(5)

['Dec 13 00:06:16 submitty sshd[19509]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root',
 'Dec 13 00:06:17 submitty sshd[19509]: Failed password for root from 218.92.0.184 port 3753 ssh2',
 'Dec 13 00:06:21 submitty sshd[19509]: Failed password for root from 218.92.0.184 port 3753 ssh2',
 'Dec 13 00:10:35 submitty sshd[19716]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.247  user=root',
 'Dec 13 00:10:36 submitty sshd[19716]: Failed password for root from 218.92.0.247 port 16026 ssh2']

### Method to break down error message

In [11]:
import re
def breakdown_failed(entry):
    extracts = re.search(r'(?P<date>\w+\s+\d+) (?P<time>\d+:\d+:\d+) \S+ \S+\[(?P<proc>\d+)\]: ?(?P<msg>[\S ]+)', entry)
    return extracts.groups() if extracts else entry

### Test ip address findings

In [12]:
import re
s = 'Dec 13 00:10:35 submitty sshd[19716]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.247  user=root'
breakdown_failed(s)
result = re.findall("\d+.\d+.\d+.\d+", breakdown_failed(s)[3])[0]
print(result)

218.92.0.247


### Ip address finder method

In [14]:
import re
def find_ip( line ):
    ip = re.findall("\d+.\d+.\d+.\d+", breakdown_failed(line)[3])
    if(len(ip) > 0):
        return ip[0]
    else:
        return ""

### Testing ip address finder method

In [15]:
s = 'Dec 13 00:10:36 submitty sshd[19716]: Failed password for root from 218.92.0.247 port 16026 ssh2'
find_ip(s)

'218.92.0.247'

### Gets each root fails ip address then sorts then by error message and ip address

In [16]:
import re
filterFails = rootFails.filter(lambda line: len(breakdown_failed(line)[3])>1)
rootFailsIPAddress = filterFails.map(lambda line: (find_ip(line), breakdown_failed(line)[3]))
rootFailsIPAddress.take(5)

[('218.92.0.184',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root'),
 ('218.92.0.184', 'Failed password for root from 218.92.0.184 port 3753 ssh2'),
 ('218.92.0.184', 'Failed password for root from 218.92.0.184 port 3753 ssh2'),
 ('218.92.0.247',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.247  user=root'),
 ('218.92.0.247',
  'Failed password for root from 218.92.0.247 port 16026 ssh2')]

### Takes a random sample of all the root error messages and ip addresses

In [17]:
rootFailsIPAddress.takeSample(False,50)

[('112.85.42.98',
  'Failed password for root from 112.85.42.98 port 51872 ssh2'),
 ('94.191.81.127',
  'Failed password for root from 94.191.81.127 port 49870 ssh2'),
 ('190.104.245.164',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=190.104.245.164  user=root'),
 ('218.92.0.185',
  'Failed password for root from 218.92.0.185 port 14143 ssh2'),
 ('218.92.0.223',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.223  user=root'),
 ('112.85.42.180',
  'Failed password for root from 112.85.42.180 port 58609 ssh2'),
 ('173.225.104.57',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=173.225.104.57  user=root'),
 ('87.236.233.108',
  'Disconnected from authenticating user root 87.236.233.108 port 15319 [preauth]'),
 ('14.18.154.186',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=14.18.154.186  user=root'),
 ('

# Answer to how many root fails is 75630

In [18]:
rootFailsCount = rootFailsIPAddress.count()
print(rootFailsCount)

75630


### Gets only unique ip addresses

In [19]:
abc = rootFailsIPAddress.groupByKey().mapValues(list)
abc.take(10)

[('218.92.0.184',
  ['pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root',
   'Failed password for root from 218.92.0.184 port 3753 ssh2',
   'Failed password for root from 218.92.0.184 port 3753 ssh2',
   'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root',
   'Failed password for root from 218.92.0.184 port 51581 ssh2',
   'Failed password for root from 218.92.0.184 port 51581 ssh2',
   'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root',
   'Failed password for root from 218.92.0.184 port 63832 ssh2',
   'Failed password for root from 218.92.0.184 port 63832 ssh2',
   'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.184  user=root',
   'Failed password for root from 218.92.0.184 port 6955 ssh2',
   'Failed password for root from 218.92.0.184 port 6955 s

### Lists just the unique ip addresses

In [20]:
rootFailsAddress = abc.map(lambda group: group[0])
rootFailsAddress.take(10)

['218.92.0.184',
 '221.131.165.81',
 '222.187.222.53',
 '222.187.238.87',
 '221.181.185.18',
 '112.85.42.96',
 '185.156.74.65',
 '221.181.185.200',
 '218.92.0.249',
 '221.181.185.198']

In [69]:
#import re
#ip6Addresses = rootFailsAddress.map(lambda line: line.replace(".", ""))
#ip6Addresses.collect()

### Practice splitting by dot

In [21]:
s = '218.92.0.184'
s.split('.')

['218', '92', '0', '184']

### Splits all ip addresses by dot

In [22]:
rootIPAddressSplit = rootFailsAddress.map(lambda line: line.split('.'))
rootIPAddressSplit.collect()

[['218', '92', '0', '184'],
 ['221', '131', '165', '81'],
 ['222', '187', '222', '53'],
 ['222', '187', '238', '87'],
 ['221', '181', '185', '18'],
 ['112', '85', '42', '96'],
 ['185', '156', '74', '65'],
 ['221', '181', '185', '200'],
 ['218', '92', '0', '249'],
 ['221', '181', '185', '198'],
 ['218', '92', '0', '248'],
 ['213', '32', '111', '53'],
 ['222', '187', '222', '105'],
 ['111', '231', '88', '39'],
 ['218', '92', '0', '171'],
 ['112', '85', '42', '85'],
 ['128', '201', '76', '184'],
 ['40', '87', '87', '198'],
 ['142', '93', '254', '122'],
 ['193', '169', '252', '59'],
 ['119', '45', '26', '117'],
 ['221', '181', '185', '143'],
 ['49', '235', '11', '137'],
 ['221', '181', '185', '149'],
 ['183', '82', '102', '66'],
 ['0 106', '12', '31'],
 ['222', '187', '227', '224'],
 ['103', '43', '185', '142'],
 ['51', '195', '42', '242'],
 ['148', '245', '13', '21'],
 ['45', '82', '137', '35'],
 ['167', '71', '1', '6'],
 ['81', '68', '238', '7'],
 ['112', '85', '42', '232'],
 ['146', '59

### Method to convery split ip address to decimal format to easily plug into the ipv4 table

In [23]:
def ip_conversion(enter):
    try:
        enter[0] = int(enter[0]) * (256 * 256 * 256)
        enter[1] = int(enter[1]) * (256 * 256)
        enter[2] = int(enter[2]) * (256)
        sum = enter[0] + enter[1] + enter[2] + int(enter[3])
        return sum
    except:
        sum = "unknown"

### Practice conversion method

In [25]:
s = ['218', '92', '0', '184']
ip_conversion(s)

3663462584

### Converts all the ip addresses to decimal format

In [26]:
rootDecimalIP = rootIPAddressSplit.map(lambda line: ip_conversion(line))
rootDecimalIP.collect()

[3663462584,
 3716392273,
 3736854069,
 3736858199,
 3719674130,
 1884629600,
 3114027585,
 3719674312,
 3663462649,
 3719674310,
 3663462648,
 3575672629,
 3736854121,
 1877432359,
 3663462571,
 1884629589,
 2160676024,
 676812742,
 2388524666,
 3249142843,
 1999444597,
 3719674255,
 837487497,
 3719674261,
 3075630658,
 None,
 3736855520,
 1730918798,
 868428530,
 2499087637,
 760383779,
 2806448390,
 1363471879,
 1884629736,
 2453380533,
 1868217908,
 837443131,
 3070651204,
 2026610706,
 2418196819,
 3197143884,
 720915972,
 2703455758,
 3191276942,
 1939823288,
 1348638217,
 1952242594,
 2406414945,
 2358218892,
 3165043635,
 1998336867,
 99141105,
 1033960290,
 1985607904,
 2984033647,
 1613606353,
 778415591,
 1022798643,
 1884160807,
 1833797593,
 3191920027,
 1966315210,
 3731938316,
 1708040912,
 3016958141,
 2649152691,
 1732209689,
 2703433424,
 1877406733,
 1290358877,
 3147972024,
 466062361,
 2050202196,
 2335902394,
 2713558279,
 1868366389,
 3242590436,
 1877460979,
 7

### Make sure this is the right csv file

In [27]:
countriesApi = sc.textFile("C://Users//samue//Downloads//IP2LOCATION-LITE-DB1.CSV//IP2LOCATION-LITE-DB1.CSV")

### Make sure the right csv file

In [28]:
countriesApi.take(5)

['"0","16777215","-","-"',
 '"16777216","16777471","US","United States of America"',
 '"16777472","16778239","CN","China"',
 '"16778240","16779263","AU","Australia"',
 '"16779264","16781311","CN","China"']

### Method to turn decimal ip into country by search the csv file to find the country the decimal ip belongs to

In [29]:
import csv
def find_country( address ):
    with open (r"C://Users//samue//Downloads//IP2LOCATION-LITE-DB1.CSV//IP2LOCATION-LITE-DB1.CSV", "r") as isp4:
        try:
            address = int(address)
            ip_reader = csv.reader(isp4)
            for line in ip_reader:
                if int(address) >= int(line[0]) and address <= int(line[1]):
                    if line[3] != "":
                        country = line[3]
                        return country
        except:
            country = "unknown"
find_country(3663462584)

'China'

### Practice country conversion

In [30]:
s = ["16777216","16777471","US","United States of America"]
find_country(16777218)

'United States of America'

### Converts the decimal ip addresses into countries

In [31]:
resultCountries = rootDecimalIP.map(lambda line: find_country(line))
resultCountries.collect()

['China',
 'China',
 'China',
 'China',
 'China',
 'China',
 'Azerbaijan',
 'China',
 'China',
 'China',
 'China',
 'France',
 'China',
 'China',
 'China',
 'China',
 'Brazil',
 'United States of America',
 'United States of America',
 'Poland',
 'China',
 'China',
 'China',
 'China',
 'India',
 None,
 'China',
 'China',
 'Germany',
 'Mexico',
 'Iran (Islamic Republic of)',
 'Netherlands',
 'China',
 'China',
 'France',
 'Thailand',
 'China',
 'China',
 'China',
 'United States of America',
 'Colombia',
 'China',
 'United States of America',
 'Argentina',
 'China',
 'Hungary',
 'Hong Kong',
 'India',
 'China',
 'Singapore',
 'Hong Kong',
 'Iran (Islamic Republic of)',
 'China',
 'China',
 'Brazil',
 'United States of America',
 'Germany',
 'Macao',
 'Viet Nam',
 'Ireland',
 'Uruguay',
 'China',
 'Korea (Republic of)',
 'China',
 'Brazil',
 'United States of America',
 'Viet Nam',
 'Germany',
 'China',
 'United States of America',
 'Mexico',
 'China',
 'China',
 'India',
 'China',
 'Ind

### Groups all the countries with each other

In [32]:
rootCountries = resultCountries.map(lambda name: (name, 1)) \
          .reduceByKey(lambda a, b: a + b)
rootCountries.take(1)

[('Poland', 23)]

# Answer to list all the countries root fail ip addresses are from

In [33]:
rootCountries.collect()

[('Poland', 23),
 ('India', 129),
 (None, 7),
 ('Germany', 122),
 ('Thailand', 20),
 ('Viet Nam', 44),
 ('Uruguay', 3),
 ('Ukraine', 18),
 ('Canada', 60),
 ('Japan', 29),
 ('Taiwan (Province of China)', 23),
 ('Bangladesh', 4),
 ('Malaysia', 14),
 ('Romania', 6),
 ('Tunisia', 4),
 ('Denmark', 2),
 ('Belgium', 4),
 ('Syrian Arab Republic', 2),
 ('Saudi Arabia', 1),
 ('Serbia', 2),
 ('Uganda', 1),
 ('United Arab Emirates', 2),
 ('New Zealand', 1),
 ('Finland', 2),
 ('Mongolia', 1),
 ('Cambodia', 1),
 ('Zimbabwe', 1),
 ('Guatemala', 1),
 ('Mexico', 28),
 ('Netherlands', 71),
 ('Macao', 2),
 ('Korea (Republic of)', 106),
 ('Chile', 8),
 ('Australia', 8),
 ('Cyprus', 5),
 ('Panama', 4),
 ('Norway', 1),
 ('Senegal', 2),
 ('Mozambique', 1),
 ('Algeria', 1),
 ('Azerbaijan', 2),
 ('Argentina', 30),
 ('Spain', 12),
 ('Ethiopia', 2),
 ('United Kingdom of Great Britain and Northern Ireland', 48),
 ('Portugal', 8),
 ('Bulgaria', 3),
 ('Ecuador', 4),
 ('Slovenia', 2),
 ('Morocco', 2),
 ('Peru', 2),


In [34]:
#resultCountries = ip6Addresses.map(lambda address: find_country(l))
#resultCountries.collect()

### Gets all the fails that weren't root fails

In [35]:
nonRootFails = fails.filter(lambda line: line.find("root") < 0)
nonRootFails.take(5)

['Dec 13 00:08:29 submitty sshd[19552]: Invalid user shalini from 27.128.173.81 port 49454',
 'Dec 13 00:08:29 submitty sshd[19552]: pam_unix(sshd:auth): check pass; user unknown',
 'Dec 13 00:08:29 submitty sshd[19552]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=27.128.173.81',
 'Dec 13 00:08:32 submitty sshd[19552]: Failed password for invalid user shalini from 27.128.173.81 port 49454 ssh2',
 'Dec 13 00:08:40 submitty sshd[19560]: Invalid user user5 from 134.175.17.32 port 44532']

# Answer to how many attempts by non root users is 88623

In [37]:
nonRootFails.count()

88623

### Fixing my ip method to work for non root messages

In [77]:
import re
def find_ip2( line ):
    ip = re.findall("\d+\\.\d+\\.\d+\\.\d+", breakdown_failed(line)[3])
    if(len(ip) > 0):
        return ip[0]
    else:
        return ""

### Find the ip addresses for non roots in messages

In [78]:
import re
filternonRootFails = nonRootFails.filter(lambda line: len(breakdown_failed(line)[3])>1)
nonrootFailsIPAddress = filternonRootFails.map(lambda line: (find_ip2(line), breakdown_failed(line)[3]))
nonrootFailsIPAddress.take(30)

[('27.128.173.81', 'Invalid user shalini from 27.128.173.81 port 49454'),
 ('', 'pam_unix(sshd:auth): check pass; user unknown'),
 ('27.128.173.81',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=27.128.173.81'),
 ('27.128.173.81',
  'Failed password for invalid user shalini from 27.128.173.81 port 49454 ssh2'),
 ('134.175.17.32', 'Invalid user user5 from 134.175.17.32 port 44532'),
 ('', 'pam_unix(sshd:auth): check pass; user unknown'),
 ('134.175.17.32',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=134.175.17.32'),
 ('201.90.101.165', 'Invalid user lms from 201.90.101.165 port 44854'),
 ('', 'pam_unix(sshd:auth): check pass; user unknown'),
 ('201.90.101.165',
  'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=201.90.101.165'),
 ('134.175.17.32',
  'Failed password for invalid user user5 from 134.175.17.32 port 44532 ssh2'),
 ('134.175.17.32',
  'Received 

### Finds the unique ip addresses

In [79]:
groupingNonRoot = nonrootFailsIPAddress.groupByKey().mapValues(list)
groupingNonRoot.take(10)

[('',
  ['pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pam_unix(sshd:auth): check pass; user unknown',
   'pa

### Lists the ip unique ip addresses has to remove the first one because first one is nothing 

In [80]:
nonRootFailsAddress = groupingNonRoot.map(lambda group: group[0])
header = nonRootFailsAddress.first()
realAttempt = nonRootFailsAddress.filter(lambda x: x != header)
realAttempt.take(5)

['134.175.17.32',
 '185.156.74.65',
 '61.181.80.253',
 '140.143.24.26',
 '79.127.36.98']

### Splits the unique ip addresses by removing the dot

In [81]:
nonRootIPAddressSplit = realAttempt.map(lambda line: line.split('.'))
nonRootIPAddressSplit.collect()

[['134', '175', '17', '32'],
 ['185', '156', '74', '65'],
 ['61', '181', '80', '253'],
 ['140', '143', '24', '26'],
 ['79', '127', '36', '98'],
 ['157', '230', '183', '86'],
 ['70', '108', '185', '122'],
 ['177', '12', '2', '53'],
 ['68', '183', '238', '175'],
 ['213', '32', '111', '53'],
 ['138', '99', '7', '29'],
 ['111', '231', '88', '39'],
 ['110', '42', '1', '213'],
 ['178', '128', '127', '126'],
 ['165', '22', '210', '35'],
 ['139', '199', '35', '168'],
 ['161', '117', '57', '88'],
 ['132', '232', '98', '228'],
 ['198', '199', '73', '239'],
 ['60', '235', '24', '222'],
 ['193', '169', '252', '59'],
 ['42', '200', '80', '42'],
 ['81', '70', '46', '31'],
 ['95', '111', '253', '158'],
 ['54', '37', '71', '204'],
 ['123', '231', '160', '98'],
 ['128', '199', '1', '184'],
 ['189', '1', '163', '101'],
 ['179', '253', '179', '216'],
 ['81', '30', '179', '11'],
 ['122', '51', '211', '131'],
 ['206', '189', '157', '50'],
 ['51', '178', '47', '46'],
 ['128', '199', '167', '234'],
 ['161', 

### Turning the non root ip address into decimal ip address for ipv4

In [82]:
nonRootDecimalIP = nonRootIPAddressSplit.map(lambda line: ip_conversion(line))
nonRootDecimalIP.collect()

[2259620128,
 3114027585,
 1035292925,
 2358188058,
 1333732450,
 2649143126,
 1181530490,
 2970354229,
 1152904879,
 3575672629,
 2321745693,
 1877432359,
 1848246741,
 2994765694,
 2769736227,
 2345083816,
 2708814168,
 2229822180,
 3334949359,
 1022040286,
 3249142843,
 717770794,
 1363553823,
 1601174942,
 908412876,
 2078777442,
 2160525752,
 3171001189,
 3019748312,
 1360966411,
 2050216835,
 3468533042,
 867315502,
 2160568298,
 2703438610,
 860125675,
 1839208147,
 3106804323,
 2389725623,
 1998345124,
 2303358770,
 1779177880,
 2919162235,
 1909555314,
 2813109285,
 1653508868,
 1018046582,
 3080683350,
 1779179971,
 2769698711,
 3044624438,
 2155125253,
 1981285757,
 1761093944,
 1423296956,
 3664403522,
 2946631989,
 888367008,
 1868366389,
 1998419863,
 2372030916,
 2503501009,
 3414627364,
 2388490543,
 1046229234,
 3039833795,
 2453379704,
 1485242762,
 1807135014,
 1997102421,
 34774333,
 3663462571,
 1383457292,
 1556001063,
 3149687667,
 987614322,
 2061767503,
 325919

# All the Countries non root users ip addresses are from

In [83]:
nonRootResultCountries = nonRootDecimalIP.map(lambda line: find_country(line))
nonRootCountries = nonRootResultCountries.map(lambda name: (name, 1)) \
          .reduceByKey(lambda a, b: a + b)
nonRootCountries.collect()

[('India', 256),
 ('Poland', 43),
 ('Germany', 225),
 ('Canada', 105),
 ('Ukraine', 32),
 ('Romania', 15),
 ('Viet Nam', 115),
 ('Malaysia', 27),
 ('Taiwan (Province of China)', 35),
 ('Japan', 69),
 ('Cambodia', 2),
 ('United Arab Emirates', 8),
 ('Thailand', 45),
 ('Uruguay', 3),
 ('Mongolia', 5),
 ('Namibia', 1),
 ('Bangladesh', 9),
 ('Tunisia', 10),
 ('Denmark', 1),
 ('Zimbabwe', 3),
 ('Belgium', 11),
 ('Syrian Arab Republic', 2),
 ('Serbia', 4),
 ('Bhutan', 1),
 ('Finland', 6),
 ('Kyrgyzstan', 1),
 ('Saudi Arabia', 2),
 ('Luxembourg', 2),
 ("Cote D'ivoire", 3),
 ('Uganda', 2),
 ('New Zealand', 2),
 ('Guatemala', 1),
 ('Puerto Rico', 1),
 ('Cameroon', 1),
 ('Albania', 1),
 ('Norway', 5),
 ('Australia', 15),
 ('Netherlands', 146),
 ('Mexico', 50),
 ('Korea (Republic of)', 197),
 ('Macao', 3),
 ('Chile', 13),
 ('Panama', 6),
 ('Lebanon', 2),
 ('Senegal', 2),
 ('-', 16),
 ('Algeria', 2),
 ('Mozambique', 1),
 ('Sudan', 1),
 ('Angola', 1),
 ('Azerbaijan', 2),
 ('Argentina', 54),
 ('Bulg

### Filter out the attempts without usernames

In [38]:
knownRootFails = nonRootFails.filter(lambda line: line.find("unknown") < 0)
knownRootFails.take(50)

['Dec 13 00:08:29 submitty sshd[19552]: Invalid user shalini from 27.128.173.81 port 49454',
 'Dec 13 00:08:29 submitty sshd[19552]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=27.128.173.81',
 'Dec 13 00:08:32 submitty sshd[19552]: Failed password for invalid user shalini from 27.128.173.81 port 49454 ssh2',
 'Dec 13 00:08:40 submitty sshd[19560]: Invalid user user5 from 134.175.17.32 port 44532',
 'Dec 13 00:08:40 submitty sshd[19560]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=134.175.17.32',
 'Dec 13 00:08:41 submitty sshd[19562]: Invalid user lms from 201.90.101.165 port 44854',
 'Dec 13 00:08:41 submitty sshd[19562]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=201.90.101.165',
 'Dec 13 00:08:43 submitty sshd[19560]: Failed password for invalid user user5 from 134.175.17.32 port 44532 ssh2',
 'Dec 13 00:08:43 submitty sshd[19560]: Received disconnect

In [28]:
s = 'Dec 13 00:12:56 submitty sshd[19848]: Received disconnect from 103.152.101.19 port 60110:11: Bye Bye [preauth]'
breakdown_failed(s)[2]

'19848'

### Gets the error messages for the the fails with known user names

In [39]:
knownRootFailsGroups = knownRootFails.map(lambda line: breakdown_failed(line)[3])
knownRootFailsGroups.take(5)

['Invalid user shalini from 27.128.173.81 port 49454',
 'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=27.128.173.81',
 'Failed password for invalid user shalini from 27.128.173.81 port 49454 ssh2',
 'Invalid user user5 from 134.175.17.32 port 44532',
 'pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=134.175.17.32']

### Makes sure we just have the error messages with user names as not all error message include the user name

In [40]:
withUsers = knownRootFailsGroups.filter(lambda line: line.find("user") > 0)
withUsersTake  = withUsers.filter(lambda line: line.find("failure") < 0)
withUsersTake.take(20)

['Invalid user shalini from 27.128.173.81 port 49454',
 'Failed password for invalid user shalini from 27.128.173.81 port 49454 ssh2',
 'Invalid user user5 from 134.175.17.32 port 44532',
 'Invalid user lms from 201.90.101.165 port 44854',
 'Failed password for invalid user user5 from 134.175.17.32 port 44532 ssh2',
 'Disconnected from invalid user user5 134.175.17.32 port 44532 [preauth]',
 'Failed password for invalid user lms from 201.90.101.165 port 44854 ssh2',
 'Disconnected from invalid user lms 201.90.101.165 port 44854 [preauth]',
 'Invalid user ttf from 119.45.22.71 port 60804',
 'Failed password for invalid user ttf from 119.45.22.71 port 60804 ssh2',
 'Disconnected from invalid user ttf 119.45.22.71 port 60804 [preauth]',
 'Invalid user finn from 58.17.200.197 port 49650',
 'Failed password for invalid user finn from 58.17.200.197 port 49650 ssh2',
 'Disconnected from invalid user finn 58.17.200.197 port 49650 [preauth]',
 'Invalid user merle from 14.161.45.187 port 56273',

### Method for finding the user names

In [41]:
import re
def find_user_names(s):
    extracts = re.search("([\S ]+) user (\S+) ([\S ]+)", s)
    return extracts.groups() if extracts else s

### Testing the find user name method

In [42]:
s = 'Invalid user shalini from 27.128.173.81 port 49454'
find_user_names(s)[1]

'shalini'

### Finding all the user names of each error message with user names

In [43]:
names = withUsersTake.map(lambda line: find_user_names(line)[1])
names.take(10)

['shalini',
 'shalini',
 'user5',
 'lms',
 'user5',
 'user5',
 'lms',
 'lms',
 'ttf',
 'ttf']

# Groups all like user names Answer to list all of the non root users user names

In [44]:
namesGrouped = names.map(lambda name: (name, 1)) \
            .reduceByKey(lambda a, b: a + b)
namesGrouped.take(10000)

[('shalini', 10),
 ('named', 14),
 ('i3', 3),
 ('anthony', 20),
 ('csgo', 23),
 ('q', 9),
 ('lara', 6),
 ('webster', 6),
 ('nobody', 12),
 ('was', 8),
 ('pi', 370),
 ('ernesto', 3),
 ('test', 752),
 ('viper', 3),
 ('admin10', 3),
 ('newuser', 8),
 ('cmuir', 3),
 ('ultra', 12),
 ('dyndns', 11),
 ('luca', 8),
 ('liz', 3),
 ('photos', 6),
 ('edward', 6),
 ('jang', 3),
 ('max', 20),
 ('gelson', 5),
 ('sahara', 3),
 ('temp', 66),
 ('alexandra', 5),
 ('chelsea', 3),
 ('kjayroe', 2),
 ('azureuser', 14),
 ('boon', 5),
 ('support', 134),
 ('pasquel', 3),
 ('saml', 6),
 ('sienna', 3),
 ('aeegj', 2),
 ('clinton', 9),
 ('petera', 3),
 ('giulia', 5),
 ('svenserver', 3),
 ('vision', 3),
 ('heinz', 3),
 ('frank', 5),
 ('glpi', 6),
 ('zabbix', 61),
 ('Dakota', 5),
 ('miusuario', 3),
 ('e8telnet', 3),
 ('default', 17),
 ('telnet', 6),
 ('mamta', 6),
 ('openadmin', 6),
 ('libuuid', 41),
 ('flw', 3),
 ('hamish', 3),
 ('vboxuser', 8),
 ('eli', 3),
 ('sys', 7),
 ('smbguest', 11),
 ('redis', 12),
 ('help', 

### Finds all the dates of every fail

In [45]:
dates = fails.map(lambda line: breakdown_failed(line)[0])
dates.take(50)

['Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13',
 'Dec 13']

# Groups every fail by date and counts how many fails happened on each day. It appear the the day with the most fails is Dec 8 with 7952 fails there doesn't seem to be anything special about this day asside from being a few national holidays and some minor religious holidays with the most popular on being Feast of the Immaculate Conception which is a public holiday in multiple countries

In [47]:
dateCount = dates.map(lambda date: (date, 1)) \
            .reduceByKey(lambda a, b: a + b)
dateCount.take(100)

[('Dec  8', 7952),
 ('Nov 29', 5588),
 ('Nov 17', 4796),
 ('Dec 16', 4489),
 ('Dec  7', 6261),
 ('Dec 11', 5894),
 ('Dec  1', 4375),
 ('Dec  2', 4983),
 ('Dec  5', 5031),
 ('Nov 23', 4963),
 ('Nov 27', 6167),
 ('Dec 15', 5629),
 ('Nov 19', 4245),
 ('Nov 21', 4382),
 ('Dec 13', 6498),
 ('Dec  9', 5388),
 ('Dec 10', 4359),
 ('Dec 12', 5237),
 ('Nov 30', 3848),
 ('Dec  4', 5232),
 ('Nov 24', 6478),
 ('Nov 25', 4603),
 ('Nov 26', 4420),
 ('Nov 20', 4596),
 ('Dec 14', 4094),
 ('Dec  3', 5438),
 ('Dec  6', 6844),
 ('Nov 28', 5356),
 ('Nov 15', 3436),
 ('Nov 16', 4310),
 ('Nov 18', 3771),
 ('Nov 22', 5590)]