Skip to content

Commit

Permalink
Major modifications and new features. In particular: hash-based verif…
Browse files Browse the repository at this point in the history
…ication of existing files, more accurate scan counting based on file names, and more.
  • Loading branch information
franckferman committed Nov 9, 2023
1 parent da910b1 commit f983cbc
Showing 1 changed file with 84 additions and 20 deletions.
104 changes: 84 additions & 20 deletions src/MetaDetective/MetaDetective.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
"""Unleash Metadata Intelligence with MetaDetective. Your Assistant Beyond Metagoofil.
Created By : Franck FERMAN @franckferman
Created Date: 27/08/2023
Version : 1.0.8 (24/10/2023)
Created Date: 27/08/23
Version : 1.0.9 (09/11/23)
"""

import argparse
import datetime
import hashlib
import http.client
import json
import os
Expand Down Expand Up @@ -61,7 +62,7 @@
EXIFTOOL_EXECUTION_ERROR = "Error: exiftool encountered an error."

NOMINATIM_HOST = "nominatim.openstreetmap.org"
USER_AGENT = 'MetaDetective/1.0.8'
USER_AGENT = 'MetaDetective/1.0.9'
NOMINATIM_ENDPOINT = "/reverse?format=jsonv2&lat={lat}&lon={lon}"

NOMINATIM_LINK = "https://nominatim.openstreetmap.org/ui/reverse.html?lat={lat}&lon={lon}"
Expand Down Expand Up @@ -908,9 +909,13 @@ def process_url(url: str, depth: int, base_domain: str, q, seen: Set[str],
download_file(file_link, download_dir)
elif scan:
for file_link in file_links:
extension = os.path.splitext(file_link)[-1].lstrip('.')
file_url = urljoin(url, file_link)
file_name = os.path.basename(urlparse(file_url).path)
extension = os.path.splitext(file_name)[-1].lstrip('.')
with lock:
file_stats[extension] = file_stats.get(extension, 0) + 1
if extension not in file_stats:
file_stats[extension] = set()
file_stats[extension].add((file_url, file_name))

if depth > 0:
for link in links:
Expand Down Expand Up @@ -946,23 +951,81 @@ def wait(self) -> None:
self.last_call = time.time()


def calculate_hash(data: bytes) -> str:
"""
Calculate the SHA-256 hash of the given data.
Args:
data (bytes): The binary content of the data to be hashed.
Returns:
str: The hexadecimal digest of the SHA-256 hash.
"""
sha256_hash = hashlib.sha256()
sha256_hash.update(data)
return sha256_hash.hexdigest()


def find_unique_filename(path: str) -> str:
"""
Generate a unique filename in the directory of the provided path by appending
a numeric suffix to the base name if the proposed file already exists.
Args:
path (str): The initial file path for which a unique version is sought.
Returns:
str: A unique file path. If the initial path was unique, it is returned unchanged;
otherwise, a suffix is added before the file extension.
"""
counter = 2
base, ext = os.path.splitext(path)
while os.path.exists(path):
path = f"{base}-{counter}{ext}"
counter += 1
return path


def download_file(url: str, download_dir: str) -> None:
"""
Download a file from a given URL and save it to a specified directory.
Download a file from a specified URL and save it to the given directory.
If the file already exists and the content is identical (same hash),
the download is skipped. If the file exists but the content is different,
a new unique filename is generated.
Args:
url (str): URL of the file to be downloaded.
download_dir (str): Directory where the file should be saved.
url (str): The URL from which the file will be downloaded.
download_dir (str): The directory path where the file will be saved.
Raises:
Exception: If the download fails for any reason, the exception is caught and
an error message with the reason for the failure is printed.
"""
try:
encoded_url = quote(url, safe=":/?&=")
local_filename = os.path.join(download_dir, os.path.basename(urlparse(encoded_url).path))
with urllib.request.urlopen(encoded_url) as response, open(local_filename, 'wb') as out_file:

with urllib.request.urlopen(encoded_url) as response:
data = response.read()
out_file.write(data)
print(f"INFO: Downloaded {encoded_url} to {local_filename}")
file_hash = calculate_hash(data)

if os.path.exists(local_filename):
with open(local_filename, 'rb') as existing_file:
existing_file_hash = calculate_hash(existing_file.read())

if file_hash == existing_file_hash:
print(f"WARNING: Duplicate file detected for '{local_filename}'. Both have the same hash: {file_hash}.")
return
else:
new_local_filename = find_unique_filename(local_filename)
print(f"INFO: File '{local_filename}' already exists with a different hash. Saving the new file as '{new_local_filename}'.")
local_filename = new_local_filename

with open(local_filename, 'wb') as out_file:
out_file.write(data)
print(f"INFO: Downloaded {url} to {local_filename}. SHA-256: {file_hash}.")
except Exception as e:
print(f"ERROR: Failed to download {encoded_url} Reason: {e}")
print(f"ERROR: Failed to download {url}. Reason: {e}")


def worker_thread(q: queue.Queue[Tuple[str, int, str, bool]],
Expand Down Expand Up @@ -1153,14 +1216,15 @@ def main():
t.join()

if args.scan:
print("")
if len(file_stats) > 0:
print("Scan results:")
for ext, count in file_stats.items():
print(f"{ext}: {count}")
if len(file_stats) > 0:
print("")
print(f"INFO: Total URLs processed (followed): {len(seen)}")
print("\nScan results:\n")
print("+---------------+-----------------------------------+")
print("| File Extension | Estimated Number of Unique Files |")
print("+---------------+-----------------------------------+")
for ext, files in file_stats.items():
print(f"| {ext.ljust(14)} | {str(len(files)).ljust(32)} |")
print("+---------------+-----------------------------------+")
print(f"\nINFO: Total URLs processed (followed): {len(seen)}")
print("NOTE: These results provide an estimation and do not guarantee the uniqueness of the files.")

sys.exit(0)

Expand Down

0 comments on commit f983cbc

Please sign in to comment.