In [9]:
pip install requests

Note: you may need to restart the kernel to use updated packages.


In [None]:
import requests

class ThreatIntelClient:
    """
    Classe pour interroger différentes API de Threat Intelligence :
      - VirusTotal (hash, IP, domaine)
      - Abuse.ch (MalwareBazaar pour un hash)
      - ipstack (géolocalisation d'une IP)
    """

    def __init__(self, vt_api_key=None, ipstack_key=None):
        """
        Initialise le client avec les clés nécessaires.
        :param vt_api_key: Clé d'API VirusTotal (v3).
        :param ipstack_key: Clé d'API ipstack.
        """
        self.vt_api_key = vt_api_key
        self.ipstack_key = ipstack_key

    # -------------------------------------------------------------------------
    #                         1) VirusTotal
    # -------------------------------------------------------------------------
    def query_virustotal(self, ioc):
        """
        Interroge l'API VirusTotal pour un IOC (hash, IP ou nom de domaine).
        :param ioc: L'indicateur de compromission (str).
        :return: Résultat JSON brut (dict) ou None en cas d'erreur.
        """
        if not self.vt_api_key:
            print("[VirusTotal] Aucune clé API fournie.")
            return None

        url = "https://www.virustotal.com/api/v3/search"
        headers = {"x-apikey": self.vt_api_key}
        params = {"query": ioc}

        try:
            response = requests.get(url, headers=headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"[VirusTotal] Erreur lors de la requête : {e}")
            return None

    def extract_virustotal_info(self, vt_json):
        """
        Extrait les informations les plus pertinentes du JSON renvoyé par VirusTotal.
        Retourne un dict avec les champs sélectionnés pour l'UI.
        """
        if not vt_json or "data" not in vt_json or len(vt_json["data"]) == 0:
            return {
                "status": "No result",
                "message": "Aucune donnée disponible ou format inattendu."
            }

        # On prend le premier résultat renvoyé par la recherche
        item = vt_json["data"][0]
        attributes = item.get("attributes", {})
        
        
        last_analysis_stats = attributes.get("last_analysis_stats", {})
        total_scans = sum(last_analysis_stats.values())
        malicious = last_analysis_stats.get("malicious", 0)
        suspicious = last_analysis_stats.get("suspicious", 0)

        # récupération d'un score "malicious/suspicious" sur le total
        detection_ratio = f"{malicious + suspicious}/{total_scans}" if total_scans else "0/0"

        # on récupère aussi le type d'IOC si disponible
        ioc_type = item.get("type", "unknown")

        return {
            "ioc_type": ioc_type,
            "detection_ratio": detection_ratio,
            "total_scans": total_scans,
            "malicious": malicious,
            "suspicious": suspicious
        }

    # -------------------------------------------------------------------------
    #                         2) Abuse.ch (MalwareBazaar)
    # -------------------------------------------------------------------------
    def query_abusech_malwarebazaar(self, file_hash):
        """
        Interroge MalwareBazaar (service d'Abuse.ch) pour un hash de fichier.
        :param file_hash: Hash du fichier malveillant (MD5, SHA256, etc.).
        :return: Résultat JSON brut (dict) ou None en cas d'erreur.
        """
        url = "https://mb-api.abuse.ch/api/v1/"
        data = {
            "query": "get_info",
            "hash": file_hash
        }

        try:
            response = requests.post(url, data=data)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"[Abuse.ch] Erreur lors de la requête : {e}")
            return None

    def extract_abusech_info(self, abuse_json):
        """
        Extrait les informations pertinentes du JSON renvoyé par Abuse.ch (MalwareBazaar).
        Retourne un dict avec les champs sélectionnés pour l'UI.
        """
        if not abuse_json:
            return {
                "status": "No result",
                "message": "Aucune donnée reçue de l'API."
            }

        query_status = abuse_json.get("query_status", "unknown")
        if query_status != "ok":
            return {
                "status": query_status,
                "message": "La requête n'a pas abouti ou hash inconnu."
            }

        # La clé "data" contient un tableau d'échantillons
        data_list = abuse_json.get("data", [])
        if not data_list:
            return {
                "status": "No data",
                "message": "Aucun échantillon correspondant."
            }

        # On ne prend que le premier échantillon pour l'affichage
        sample = data_list[0]
        file_type = sample.get("file_type", "N/A")
        signature = sample.get("signature", "N/A")
        reporter = sample.get("reporter", "N/A")
        tags = sample.get("tags", [])

        return {
            "status": "ok",
            "file_type": file_type,
            "signature": signature,
            "reporter": reporter,
            "tags": tags
        }

    # -------------------------------------------------------------------------
    #                         3) ipstack (Géolocalisation)
    # -------------------------------------------------------------------------
    def query_ipstack(self, ip_address):
        """
        Interroge l'API ipstack pour récupérer la géolocalisation d'une IP.
        :param ip_address: Adresse IP (str).
        :return: Résultat JSON brut (dict) ou None en cas d'erreur.
        """
        if not self.ipstack_key:
            print("[ipstack] Aucune clé API fournie.")
            return None

        base_url = "http://api.ipstack.com"
        endpoint = f"{base_url}/{ip_address}"
        params = {
            "access_key": self.ipstack_key
        }

        try:
            response = requests.get(endpoint, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"[ipstack] Erreur lors de la requête : {e}")
            return None

    def extract_ipstack_info(self, ipstack_json):
        """
        Extrait les informations pertinentes du JSON renvoyé par ipstack.
        Retourne un dict avec les champs sélectionnés pour l'UI.
        """
        if not ipstack_json or "ip" not in ipstack_json:
            return {
                "status": "No result",
                "message": "Aucune donnée disponible ou format inattendu."
            }

        return {
            "ip": ipstack_json.get("ip"),
            "continent": ipstack_json.get("continent_name"),
            "country": ipstack_json.get("country_name"),
            "region": ipstack_json.get("region_name"),
            "city": ipstack_json.get("city"),
            "latitude": ipstack_json.get("latitude"),
            "longitude": ipstack_json.get("longitude")
        }


# -------------------------------------------------------------------------
# Exemple d'utilisation
# -------------------------------------------------------------------------
if __name__ == "__main__":
    
    VT_API_KEY = "b992edca26251e7f907ed388a9c56cbb575ca9e2948b274745c00d4c41f55b1e"
    IPSTACK_KEY = "aeb90db40a564d020b282ceadb009330"

    # Instanciation du client
    client = ThreatIntelClient(vt_api_key=VT_API_KEY, ipstack_key=IPSTACK_KEY)

    # Vérification d'une IP sur VirusTotal
    ioc_ip = "8.8.8.8"
    vt_raw = client.query_virustotal(ioc_ip)
    vt_parsed = client.extract_virustotal_info(vt_raw)
    print("\n=== VirusTotal (IP) - Résumé sélectionné ===")
    print(vt_parsed)

    # Vérification d'un hash sur VirusTotal
    ioc_hash = "d20632a35481ba79def4d3f629feb01ff6d3c1c441af51d7756b9909d5972a50"  
    vt_raw_hash = client.query_virustotal(ioc_hash)
    vt_parsed_hash = client.extract_virustotal_info(vt_raw_hash)
    print("\n=== VirusTotal (Hash) - Résumé sélectionné ===")
    print(vt_parsed_hash)

    # hash sur Abuse.ch (MalwareBazaar)
    abuse_raw = client.query_abusech_malwarebazaar(ioc_hash)
    abuse_parsed = client.extract_abusech_info(abuse_raw)
    print("\n=== Abuse.ch (MalwareBazaar) - Résumé sélectionné ===")
    print(abuse_parsed)

    # IP avec ipstack
    ipstack_raw = client.query_ipstack(ioc_ip)
    ipstack_parsed = client.extract_ipstack_info(ipstack_raw)
    print("\n=== ipstack (Géolocalisation) - Résumé sélectionné ===")
    print(ipstack_parsed)



=== VirusTotal (IP) - Résumé sélectionné ===
{'ioc_type': 'ip_address', 'detection_ratio': '0/94', 'total_scans': 94, 'malicious': 0, 'suspicious': 0}

=== VirusTotal (Hash) - Résumé sélectionné ===
{'ioc_type': 'file', 'detection_ratio': '21/76', 'total_scans': 76, 'malicious': 21, 'suspicious': 0}

=== Abuse.ch (MalwareBazaar) - Résumé sélectionné ===
{'status': 'ok', 'file_type': 'msi', 'signature': None, 'reporter': 'skocherhan', 'tags': ['msi', 'opendir', 'webdav']}

=== ipstack (Géolocalisation) - Résumé sélectionné ===
{'ip': '8.8.8.8', 'continent': 'North America', 'country': 'United States', 'region': 'Ohio', 'city': 'Glenmont', 'latitude': 40.5369987487793, 'longitude': -82.12859344482422}
