In [13]:
import os
import sys
import requests
import re
import csv
import json
import socket
import shodan
import dotenv
from typing import Dict, List, Any
from collections import defaultdict, Counter
from pprint import pprint

In [17]:
# import parser
# import utils
# from structs import *

# Функции

In [None]:
# для удобного сериализации и хранения объектов в JSON
def json_save(object, filename):
    with open(filename, "w") as f:
        json.dump(object, f, indent=4)

def json_load(filename):
    with open(filename, "w") as f:
        data = json.load(f)
    return data

In [18]:
# для отрисовки дерева http-путей
class TrieNode:
    def __init__(self):
        # сколько раз этот узел посетили
        self.count: int = 0
        # дочерние узлы: segment -> TrieNode
        self.children: Dict[str, TrieNode] = {}

    def insert(self, path: str):
        segments = [seg for seg in path.strip("/").split("/") if seg]
        node = self
        node.count += 1
        for seg in segments:
            if seg not in node.children:
                node.children[seg] = TrieNode()
            node = node.children[seg]
            node.count += 1
    
    def to_dict(self) -> dict:
        return {
            "count": self.count,
            "children": {
                seg: child.to_dict()
                for seg, child in self.children.items()
            }
        }

    def __repr__(self):
        return f"<TrieNode count={self.count} children={list(self.children)}>"

    def print_tree(self, name: str = "/", prefix: str = "") -> str:
        lines = []
    
        def helper(n, name, prefix_line, prefix_children) -> str:
            lines.append(f"{prefix_line}{name} ({n.count})")
            children = list(n.children.items())
            for idx, (seg, child) in enumerate(children):
                last = (idx == len(children) - 1)
                new_prefix_line = prefix_children + ("└── " if last else "├── ")
                new_prefix_children = prefix_children + ("    " if last else "│   ")
                helper(child, seg, new_prefix_line, new_prefix_children)
    
        helper(self, name, "", "")
        return "\n".join(lines)

In [None]:
def parse_wireshark_stats(filename) -> Counter:
    ip = Counter()
    with open(filename, "r") as f: 
        reader = csv.reader(f.read().strip().split("\n")[2:])
        for row in reader:
            ip[row[2]] += int(row[3]) if row[3] else 0
    return ip

In [16]:
def parse_links_file(filename, regex=None):
    root = TrieNode()
    hostnames = Counter()
    domains = Counter()
    http_path = Counter()
    hostname2path = defaultdict(set)
    with open(filename, "r") as f:
        for line in f.readlines():
            matches = re.findall(r"^(?:(?:https?):\/\/)?([^\/?#]+)(\/[^?#]*)?(?:\??([^#]*))", line)
            if len(matches) == 0:
                continue
            link = matches[0]
            if not regex:
                match = link[0]
            else:
                match = re.findall(regex, link[0])
                if len(match) == 0:
                    continue
                match = match[0]
            domain = re.findall(r"(?:.*\.)?(.*\..*)", match)
            
            path = link[1].strip()
            if len(domain):
                domains[domain[0]] += 1
            hostnames[match] += 1
            root.insert(path)
            http_path[link[1].strip()] += 1
            if match not in hostname2path:
                hostname2path[match] = set()
            hostname2path[match].add(link[1])
    return hostnames, domains, http_path, hostname2path, root

In [19]:
def parse_crtsh(domain) -> Any:
    hosts = Counter()
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
    })
    response = session.get(f"https://crt.sh/?q={domain}&output=json")
    if response.status_code != 200:
        print("fault ", response.status_code, response.text)
        return 
    for res in response.json():

        clean = re.findall(r"(\*\.)?(.*)", res["common_name"])
        if len(clean) < 1:
            continue
        hosts[clean[0][1]] += 1
        names = res["name_value"].split("\n")
        for name in names:
            clean = re.findall(r"(\*\.)?(.*)", name)
            if len(clean) < 1:
                continue
            hosts[clean[0][1]] += 1
    # json_save(response.json(), "ivi.json")
    # json_save(hosts.most_common(50), "host.json")
    return hosts.most_common(50)

In [None]:
def parse_sitemap(url) -> Counter:
    links_cnt = Counter()
    res = requests.get(url).text
    links = re.findall(r"(?:<loc>)(https?:\/\/[\d\w\.\-\/]+)(?:<\/loc>)", res)
    for link in links:
        if link[-4:] == ".xml":
            rec = parse_sitemap(link)
            for i in rec:
                links_cnt[i] += rec[i]
        links_cnt[link] += 1
    return links_cnt

In [None]:
def parse_robots_txt(hostname) -> Counter:
    url = f"https://{hostname}/robots.txt"
    print(url)
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
    })
    res = session.get(url)
    print(res.text)
    if res.status_code != 200:
        return Counter()
    links = re.findall(r"(?:Disa|A)llow:.?(.*)", res.text.strip())
    return Counter(links)

In [None]:
def get_host_info(api: shodan.Shodan, ip: str):
    res = api.host(ip)
    out = {
        "ip": ip,
    }
    params = ["isp", "org", "domains", "hostnames"]
    for param in params:
        out[param] = res[param]
    return out

# Структуры и классы

In [12]:
class Path:
    def __init__(self, base_path) -> None:
        self.base_path = base_path
        self.dump_path = self.base_path + "dump/"
        # self.ssl_keyfile = self.dump_path + "ssl.log"
        self.links_path = self.base_path + "links.txt"
        self.wireshark_stats_path = self.base_path + 'wireshark_ips.csv'

In [None]:
class Service:
    def __init__(self, path: Path) -> None:
        self.path = path
        self.save_path = self.path.base_path + "results.json"
        self.ips = None
        self.hosts = None
        self.domains = None
        self.paths = None
        self.paths_list = None
        self.path_trie = None
    
    def analyze(self):
        self.ips = parse_wireshark_stats(self.path.wireshark_stats_path)
        self.hosts, self.domains, self.paths, self.paths_list, self.path_trie = parse_links_file(self.path.links_path)


    def __dict__(self):
        return {
            "path": self.path.base_path,
            "ip": self.ips,
            "hosts": self.hosts,
            "domains": self.domains,
            "paths": self.paths,
            "path_relations": self.path.base_path + "path-tree.txt",
            "path_trie": self.path_trie.to_dict(),
            "top": {
                "ip": self.ips.most_common(50),
                "domain": self.domains.most_common(20),
                "hosts": self.hosts.most_common(50),
                "paths": self.paths.most_common(50)
            }
        }
    
    def save(self):
        print(f"saved to {self.save_path}")
        json_save(self.__dict__(), self.save_path)
        with open(self.path.base_path + "path-tree.txt", "w") as f:
            f.write(self.path_trie.print_tree())


In [3]:
# загружаем переменные окружения
dotenv.load_dotenv()

True

In [4]:
api = shodan.Shodan(os.environ["SHODAN_KEY"])

# Alibaba

In [7]:
alibaba = Service(Path("alibaba/"))

# IVI


In [11]:
# базовы анализ сервиса
ivi = Service(Path("ivi/"))
ivi.analyze()
ivi.save()

In [31]:
ips = {
    "93.171.230.70",
    "91.233.218.128",
    "91.233.218.125",
    "91.233.217.2",
    "91.233.217.130",
    "91.233.217.129",
    "91.233.217.128",
    
    
    
    "77.222.40.250"
}

'/Users/caxapok/Projects/mai-practic'

In [20]:
domains = {
    "ivi.ru",
    "ivi.tv",
    "ivicdn.tv"
}

In [18]:
hostnames = {
    "api.ivi.ru",
    "ask.ivi.ru",
    "thumbs.dfs.ivi.ru",
    "www.ivi.ru",
    "corp.ivi.ru",
    "storyboard.dfs.ivi.ru",
    "gambit-parent.dfs.ivi.ru",
    "solea-parent.dfs.ivi.ru",
    "www.ivi.tv"
}

In [21]:
crt_hostnames = Counter()
for domain in domains:
    hosts = parse_crtsh(domain)
    for host in hosts:
        crt_hostnames[host] += hosts[host]

SSLError: HTTPSConnectionPool(host='crt.sh', port=443): Max retries exceeded with url: /?q=ivi.tv&output=json (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1002)')))

In [None]:
ip_info = {}
for ip in ips:
    ip_info[ip] = get_host_info(api, ip)
json_save(ip_info, "ip_info.json")

# Megamarket

In [20]:
megamarket = Service(Path("megamarket/"))
megamarket.analyze()
megamarket.save()

In [21]:
ips = {
    
}

In [22]:
domains = {
    "sbermegamarket.ru",
    "megamarket.ru",
    "megamarket.tech"
}

In [None]:
hostnames = {
    "main-cdn.sbermegamarket.ru",
    "megamarket.ru",
    "partner-wiki.megamarket.ru"
    "partner.megamarket.ru",
    "partner.sbermegamarket.ru",
    "sbermegamarket.ru",
}

In [None]:
crt_hostnames = Counter()
for domain in domains:
    hosts = parse_crtsh(domain)
    for host in hosts:
        crt_hostnames[host] += hosts[host]