<b>Author:</b> Lukasz Olszewski<br />
<b>ver:</b> 0.1<br />
<a href=https://github.com/0lszewski/posture_psv>Visit GitHub repo for instructions</a>

## Setup

In [None]:
import pandas as pd
import socket
from ipwhois import IPWhois
import re
import matplotlib.pyplot as plt

# If shodan is not installed uncomment the below or run it in system's terminal.
#pip install shodan

#Required for map-plotting at the end of the Notebook:
#pip install pycountry-convert
#pip install geopy
#pip install folium

pd.options.mode.chained_assignment = None  # default='warn'

#tune the output display options
pd.options.display.max_rows = 1000
pd.set_option('display.max_colwidth', 100)
pd.options.display.max_columns = 22

<b> Complete your configuration here before proceeding: </b>

In [None]:
# Example: domain = "github.com"
domain = ""

In [None]:
# Set the name of the organization being assessed below. 
# You need to get the exact org name by trying out some searches on https://shodan.io first using the 'org:' filter. Example org_name="GitHub Inc."
org_name=""

# !!!!!!!!!!!!!!!!!!!!!!!!!!!!
# C L E A R  BEFORE COMITING

SHODAN_API_KEY = ""
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!

In [None]:
s_api = shodan.Shodan(SHODAN_API_KEY)

<b>Now select the first cell in the Notebook and either go to the 'Run' menu and select 'Run Selected cell and All Below' or manually continue through the notebook by pressing Shift+Enter to move cell by cell.</b>

## TLS certificates history section:

In [None]:
# Dataframe with results will land in 'df'. Notebook makes use of the crt.sh online certificate search engine. 
site = 'https://crt.sh/?q='
dfs_list = pd.read_html(site+domain)
df = dfs_list[2]
df.rename(columns = {'Logged At ⇧':'Logged At'}, inplace=True)

<b>Number of entries retreived from crt.sh:</b>

In [None]:
print(df.count()['crt.sh ID'])

In [None]:
# Calculating certs Time to Live (TTL). Dev environments which may be interesting from the posture security assessment point of view will often have shortlived Let's Encrypt issued certs.
df['TTL'] = pd.to_datetime(df['Not After']) - pd.to_datetime(df['Not Before'])

In [None]:
# Run this cell to preview results. 
# Look at the list of uniqe CNs in the returned certificate information. 
df.groupby('Common Name').count().sort_values('crt.sh ID')['crt.sh ID']

In [None]:
#Deduplicating dataframe
df_uniq = df.drop_duplicates(subset=['Common Name'])


In [None]:
#Resolving the CNs to IPs which will be needed for further enrichement down the line.

for cname in df_uniq['Common Name']:
    if cname.__contains__("*"): #handling wildcard CNs as we can't really know what's behind them.
        df_uniq.loc[df_uniq['Common Name'] == cname, 'IP'] = 'Wildcard'
    else:
        try:
            df_uniq.loc[df_uniq['Common Name'] == cname, 'IP'] = socket.gethostbyname(str(cname))
        except:
            df_uniq.loc[df_uniq['Common Name'] == cname, 'IP'] = 'NXDOMAIN' #socket.getbyhostname throws an error when unable to resolve so handling this here


In [None]:
# Geting whois information for resolved IPs to spot potentially unmanaged infrastructure or intercepted domains. 
# This can be done for example by looking for hosting providers (ASN owners) who are not typically used by the assessed org.

for ipv4 in df_uniq['IP']:
    if re.match(r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$",ipv4):
        try:
            ipwhois = IPWhois(ipv4).lookup_rdap()
            df_uniq.loc[df_uniq['IP'] == ipv4, 'IP whois'] = ipwhois['network']['name']
            df_uniq.loc[df_uniq['IP'] == ipv4, 'IP Location'] = ipwhois['asn_country_code']
        except:
            df_uniq.loc[df_uniq['IP'] == ipv4, 'IP whois'] = 'No information'
            df_uniq.loc[df_uniq['IP'] == ipv4, 'IP Location'] = 'No information'
    else:
        df_uniq.loc[df_uniq['IP'] == ipv4, 'IP whois'] = 'No information'
        df_uniq.loc[df_uniq['IP'] == ipv4, 'IP Location'] = 'No information'




<b> Preview the results: </b>

In [None]:
df_uniq

<b>Save current results to a CSV file:</b>

In [None]:
# Set the path to save the csv file
df_uniq.to_csv('certs_enriched.csv')

<b>Distribution of certificates TTLs.
    Dev environments which may be interesting from the posture security assessment point of view will often have shortlived Let's Encrypt issued certs.</b>

In [None]:
# Copying active entries to new dataframe for more convenient plot.
df_active = df_uniq[df_uniq['IP Location'] != 'No information']
df_active.reset_index(drop=True, inplace=True)
df_active.drop(columns=['crt.sh ID'], inplace=True)

In [None]:
df_active.groupby(['TTL']).count()['Common Name'].plot(kind='bar', title='Certificates TTL', figsize=[15,6], color="#2C2C2C")

## Enriching with Shodan information:

In [None]:
# If shodan is not installed uncomment the below or run it in system's terminal.
#pip install shodan

#Quote from shodan API response body:
## We don\'t care any exception caused by test code in product, swallow it ##

import shodan

In [None]:
#Adding columns
df_active['Organization'] = ''
df_active['Open ports'] = ''
df_active['TLS ver'] = ''
df_active['Cert expired'] = ''
df_active['Product'] = ''
df_active['Vulns'] = ''
df_active['Ports count'] = 0
df_active['Banners'] = ''

#Deduplicating by IP
df_active.drop_duplicates(subset=['IP'], inplace=True)
df_active.reset_index(drop=True, inplace=True)
df_active.head(1)


In [None]:
# Getting additional IPs from shodan based on org search. 

org_response = s_api.search(query='org:'+org_name)

for item in org_response['matches']:
    if df_active.loc[df_active['IP'] == item['ip_str']].empty : #IP not yet in df
        df_active.loc[len(df_active.index), 'IP'] = item['ip_str']
    else: continue
    df_active.loc[df_active['IP'] == item['ip_str'], 'IP Location'] = item['location']['country_code']
    try:
        df_active.loc[df_active['IP'] == item['ip_str'], 'Issuer Name'] = str(item['ssl']['cert']['issuer']) #result is a dictionary
    except: pass


In [None]:
# Enriching existing IPs with shodan 'ip' and 'search' query information.
# Shodan has lousy responses that tend to skip lists and dictionaries if some value is not present in the query results. This makes us do the try each time since we want to continue with any remaining available values.
# This may take a while as we're running two queries per each IP in the df_active dataframe.

for ipv4 in df_active['IP']:
    print("Checking: "+ipv4+"\n")
    if re.match(r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$",ipv4):
        try:
            host = s_api.host(ipv4) #{ip} (host) query
            host_search = s_api.search(query='ip:'+ipv4) # 'search' query has additional information available that the 'ip' query doesn't that's why we're doing both
        except:
            df_active.loc[df_active['IP'] == ipv4, 'Open ports'] = "Scan needed"
            continue
          
        try:    
            df_active.loc[df_active['IP'] == ipv4, 'Organization'] = host['org']
        except: pass
        
        try:    
            df_active.loc[df_active['IP'] == ipv4, 'Open ports'] = ' '.join(map(str, host['ports'])) #'ports' from shodan API is a list but need a str for df
        except: pass
        
        try:
            df_active.loc[df_active['IP'] == ipv4, 'Ports count'] = len(host['ports'])
        except: pass
        
        for item in host['data']:
            try:
                df_active.loc[df_active['IP'] == ipv4, 'Banners'] += ' '+item['data'] #sometimes there are multiple banners
            except: pass
        
        for item in host_search['matches']: #dealing with the search results now
            try:
                df_active.loc[df_active['IP'] == ipv4, 'Vulns'] = ' '.join(map(str, item['vulns']))
            except: pass
            try:
                df_active.loc[df_active['IP'] == ipv4, 'TLS ver'] = str(item['ssl']['cipher']) #result is a dictionary
            except: pass
            try:
                df_active.loc[df_active['IP'] == ipv4, 'Cert expired'] = item['ssl']['cert']['expired']
            except: pass
            try:
                df_active.loc[df_active['IP'] == ipv4, 'Product'] += '; '+item['product'] #sometimes there are multiple products
            except: pass
        
    else: #in case it's not an IP
        df_active.loc[df_active['IP'] == ipv4, 'Organization'] = "Invalid IP"
        df_active.loc[df_active['IP'] == ipv4, 'Open ports'] = "Invalid IP"
        df_active.loc[df_active['IP'] == ipv4, 'Banners'] = "Invalid IP"

df_active.fillna('', inplace=True) #dealing with any missing values

<b>Summary of open ports combinations on hosts:</b>

In [None]:
df_active.groupby(['Open ports']).count()['IP'].sort_values().plot(kind='barh', title='Summary of IP count with open ports.', figsize=[15,3], color="#2C2C2C")

<b>Looking at certificates issuers may provide additional clues on unmanaged infrastructure. Dev environments may often have shortlived Let's Encrypt issued certs or self signed ones.</b>

In [None]:
df_active.groupby(['Issuer Name']).count().sort_values('IP')['IP']

<b>IPs with the highest number of ports open:</b>

In [None]:
df_active = df_active.astype({"Ports count": int})
df_active.groupby(['IP']).sum()['Ports count'].sort_values(ascending=False).head(10)

<b>Organization names identified behind IPs:</b>

In [None]:
df_active[(df_active['Organization'] != 'Scan needed') & (df_active['Organization'] != '')].groupby(['Organization']).count()['IP'].sort_values().plot(kind='barh', title='Identified organizations behinds IPs', figsize=[15,3], color="#2C2C2C")

In [None]:
try:
    df_active[df_active['Vulns'] != ''].groupby(['Vulns']).count()['IP'].sort_values().plot(kind='barh', title='Summary of identified vulnerabilities.', color="#2C2C2C")
except:
    print("No vulnerabilities found!")

In [None]:
try:
    df_active[df_active['Product'] != ''].groupby(['Product']).count()['IP'].sort_values().plot(kind='barh', title='Summary of identified products.', figsize=[15,3], color="#2C2C2C")
except:
    print("No product names found!")

In [None]:
try:
    df_active[df_active['TLS ver'] != ''].groupby(['TLS ver']).count()['IP'].sort_values().plot(kind='barh', title='Summary of identified products.', figsize=[15,3], color="#2C2C2C")
except:
    print("No TLS versions could be discovered!")

## Save full results set to a CSV file:

In [None]:
df_active.to_csv('IPs_enriched_shodan.csv')

## Searching - quickly search through results without switching to CSV:

In [None]:
# Set this back to 100 when done searching to restore lower columns width.
pd.set_option('display.max_colwidth', 500)

In [None]:
# IP:

df_active[df_active['IP'] == 'IP_ADDRESS_HERE']


In [None]:
# vulns:

df_active[df_active['Vulns'].str.contains('CVE_HERE', na=False, case=False)]

In [None]:
# Domain name:
df_active[df_active['Common Name'].str.contains('SEARCH_PHRASE', na=False, case=False)]

In [None]:
#Domain name doesn't contain:
df_active[~df_active['Common Name'].str.contains('SEARCH_PHRASE', na=False, case=False)]

In [None]:
# Port(s)

df_active[(df_active['Open ports'].str.contains('22') | df_active['Open ports'].str.contains('3389') )]

In [None]:
# Port is not

df_active[~df_active['Open ports'].str.contains('443')]

In [None]:
# Expired certificates
df_active[df_active['Cert expired'] == True]

In [None]:
# Banner is:
df_active[df_active['Banners'].str.contains('SEARCH_PHRASE', case=False)]

In [None]:
# Banner is not:
df_active[~df_active['Banners'].str.contains('SEARCH_PHRASE')]

In [None]:
#cert issuer 
df_active[df_active['Issuer Name'].str.contains('SEARCH_PHRASE')]

## Ploting the IPs on a map to identify outliers:

In [None]:
from geopy.geocoders import Nominatim
import folium
from folium.plugins import MarkerCluster

geolocator = Nominatim(user_agent="posture_psv")

df_map = df_active
df_map.drop_duplicates(subset=['IP'], inplace=True) #don't want duplicated IPs for map plotting

#Translating country codes from whois IP Location to coordinates.
#Not very efficient; need to incorporate dictionary of already located codes to avoid duplicated resolution queries for repeating country codes.
for country_code in df_map['IP Location']:
    try:
        df_map.loc[df_map['IP Location'] == country_code, 'geo_Lat'] = geolocator.geocode(country_code).latitude
        df_map.loc[df_map['IP Location'] == country_code, 'geo_Long'] = geolocator.geocode(country_code).longitude
    except:
        df_map.loc[df_map['IP Location'] == country_code, 'geo_Lat'] = ""
        df_map.loc[df_map['IP Location'] == country_code, 'geo_Long'] = ""

In [None]:
#selecting a map
world_map= folium.Map(tiles="cartodbpositron")
marker_cluster = MarkerCluster().add_to(world_map)

#populating data to the map
for i in range(len(df_map)):
        lat = df_map.iloc[i]['geo_Lat']
        long = df_map.iloc[i]['geo_Long']
        radius=5
        popup_text = """Country : {}<br>"""
        popup_text = popup_text.format(df_map.iloc[i]['IP Location'])
        folium.CircleMarker(location = [lat, long], radius=radius, popup= popup_text, fill =True).add_to(marker_cluster)

In [None]:
#Display the map with IP geolocations.
world_map