# LaTeXpOsEd: Data Mining Stage, Logical Filtering Substep

In this stage the non-LaTeX files are filtered based on whether they are imported into the final PDF or not, as well as whether they are likely to contain any insteresting information.

Before running this script:

- Complete: [2_parse.ipynb](1_parse.ipynb)

In [None]:
%pip install tqdm pillow detect-secrets

In [None]:
import os
import tarfile
import tempfile
from tqdm import tqdm
import shutil
import gzip
import json
import io
import re
import zipfile
from PIL import Image
from PIL.ExifTags import TAGS, GPSTAGS
from pathlib import Path
import logging
import pandas as pd

In [None]:
ARCHIVES_DIR = 'data/archives'
FILTERED_ARCHIVES_DIR = 'data/filtered_archives'
CLEANED_ARCHIVES_DIR = 'data/cleaned_archives'
LOG = 'data/logs'

1. Extract each to the temp folder
2. Perform filtering and logging
3. Write decompressed files to filtered_archives
4. Delete original tar to free up space

In [None]:
logging.basicConfig(
    level=logging.INFO,  # Show everything
    format='%(asctime)s %(levelname)s: %(message)s',
    filename=LOG,
)

#Auxiliary functions
def has_gps_exif(img):
    """Check if image has GPS/location EXIF data."""
    try:
        exif = img._getexif()
        if not exif:
            return False
        for tag, value in exif.items():
            decoded = TAGS.get(tag, tag)
            if decoded == 'GPSInfo':
                return True
    except Exception:
        pass
    return False

def extract_gz_file(gz_path, dest_dir):
    """
    Process a .gz file which may contain:
      - A single text file (e.g., .tex)
      - A compressed archive (e.g., a zip file or tarball)
    Extracts images with GPS EXIF data, deletes others.
    """
    archive_name = Path(gz_path).stem
    subfolder_path = os.path.join(dest_dir, archive_name)
    os.makedirs(subfolder_path, exist_ok=True)

    with open(gz_path, 'rb') as f:
        gz_data = f.read()

    # Decompress GZ to get inner data
    try:
        with gzip.GzipFile(fileobj=io.BytesIO(gz_data)) as gz_file:
            inner_data = gz_file.read()
    except Exception as e:
        logging.error(f"Failed to decompress {gz_path}: {e}")
        return

    # Try to treat decompressed data as ZIP
    try:
        with zipfile.ZipFile(io.BytesIO(inner_data)) as zf:
            logging.info(f"{gz_path} contains a zip archive.")

            for member in zf.infolist():
                if member.is_dir():
                    continue

                member_filename = os.path.basename(member.filename)
                ext = Path(member_filename).suffix.lower()

                with zf.open(member) as file:
                    file_data = file.read()

                    if ext in ['.jpg', '.jpeg', '.png']:
                        try:
                            img = Image.open(io.BytesIO(file_data))
                            if has_gps_exif(img):
                                logging.info(f"GPS data found in image: {member.filename}")
                                # Save image
                                output_path = os.path.join(subfolder_path, member_filename)
                                with open(output_path, 'wb') as out_file:
                                    out_file.write(file_data)
                            else:
                                logging.info(f"Deleted image without GPS: {member.filename}")
                        except Exception as e:
                            logging.warning(f"Failed to process image: {member.filename} - {e}")
                            logging.info(f"Deleted corrupted image: {member.filename}")
                        continue

                    # Save non-image files
                    output_path = os.path.join(subfolder_path, member_filename)
                    with open(output_path, 'wb') as out_file:
                        out_file.write(file_data)
            return  # Zip processed successfully

    except zipfile.BadZipFile:
        logging.debug(f"{gz_path} is not a zip archive")

    # Try to treat as a tar archive
    try:
        with tarfile.open(fileobj=io.BytesIO(inner_data)) as tf:
            logging.info(f"{gz_path} contains a tar archive.")
            tf.extractall(path=subfolder_path, filter=None)

            # Now walk through contents for image deletion
            for root, _, files in os.walk(subfolder_path):
                for name in files:
                    path = os.path.join(root, name)
                    ext = Path(name).suffix.lower()
                    if ext in ['.jpg', '.jpeg', '.png']:
                        try:
                            img = Image.open(path)
                            if has_gps_exif(img):
                                logging.info(f"GPS data found in image: {name}")
                            else:
                                os.remove(path)
                                logging.info(f"Deleted image without GPS: {name}")
                        except Exception as e:
                            logging.warning(f"Failed to open image: {name} - {e}")
                            os.remove(path)
                            logging.info(f"Deleted corrupted image: {name}")
            return  # Tar processed

    except tarfile.TarError:
        logging.debug(f"{gz_path} is not a tar archive either")

    # Otherwise: assume it's a single file (e.g. .tex)
    output_filename = archive_name
    if not Path(output_filename).suffix:
        output_filename += '.tex'

    output_path = os.path.join(subfolder_path, output_filename)
    with open(output_path, 'wb') as out_file:
        out_file.write(inner_data)
    logging.info(f"Extracted single file from {gz_path} as {output_filename}")

def process_tar_file(tar_path):
    with tempfile.TemporaryDirectory() as temp_dir:
        try:
            with tarfile.open(tar_path, 'r') as tar:
                tar.extractall(path=temp_dir, filter=None)
        except Exception as e:
            logging.error(f"Failed to extract {tar_path}: {e}")
            return

        # Assume single folder inside
        extracted_root = next(Path(temp_dir).iterdir())
        if not extracted_root.is_dir():
            logging.warning(f"Expected a directory inside {tar_path}, found a file.")
            return

        for item in extracted_root.iterdir():
            if item.suffix.lower() == '.pdf':
                logging.info(f"Deleting PDF: {item.name}")
                item.unlink()  # Delete the PDF
            elif item.suffix.lower() == '.gz':
                try:
                    extract_gz_file(str(item), temp_dir)
                except Exception as e:
                    logging.error(f"Failed to process .gz file {item.name}: {e}")
                item.unlink()  # Delete original .gz after processing

        # After processing all, copy temp_dir to final output
        shutil.rmtree(extracted_root)
        logging.info(f"Removed original folder: {extracted_root}")
        
        for item in Path(temp_dir).iterdir():
            if item.is_dir():
                if any(item.iterdir()):  # Folder is not empty
                    dest = Path(FILTERED_ARCHIVES_DIR) / item.name
                    shutil.copytree(item, dest, dirs_exist_ok=True)
                    logging.info(f"Copied folder to output: {dest}")
                else:
                    logging.info(f"Skipped empty folder: {item.name}")
            else:
                logging.debug(f"Skipping non-folder file in temp_dir: {item.name}")

        logging.info(f"Finished processing {tar_path}")


# Cycle through all tar files in the archive directory
def extract_and_process_tar_files(tar_dir):
    for filename in os.listdir(tar_dir):
        if filename.endswith('.tar'):
            tar_path = os.path.join(tar_dir, filename)
            print(f"\nExtracting {tar_path}")
            try:
                process_tar_file(tar_path)
                os.remove(tar_path)
                logging.info(f"Deleted tar file after processing: {filename}")
            except Exception as e:
                logging.error(f"Error processing {filename}: {e}")

extract_and_process_tar_files(ARCHIVES_DIR)

In [None]:
# Extract and summarize EXIF metadata from images

def get_exif_data(image_path):
    """Extract EXIF data from an image file."""
    try:
        image = Image.open(image_path)
        exif_data = image._getexif()
        if not exif_data:
            return {}
        exif = {}
        for tag_id, value in exif_data.items():
            tag = TAGS.get(tag_id, tag_id)
            if tag == "GPSInfo":
                gps_data = {}
                for t in value:
                    sub_tag = GPSTAGS.get(t, t)
                    gps_data[sub_tag] = value[t]
                exif[tag] = gps_data
            else:
                exif[tag] = value
        return exif
    except Exception as e:
        return {}

def has_location(exif):
    """Check if EXIF contains valid GPS Latitude and Longitude."""
    gps_info = exif.get('GPSInfo', {})
    return 'GPSLatitude' in gps_info and 'GPSLongitude' in gps_info

def has_device_info(exif):
    """Check if EXIF contains Make or Model."""
    return bool(exif.get('Make')) or bool(exif.get('Model'))

def has_time_taken(exif):
    """Check if EXIF contains DateTimeOriginal or DateTime."""
    return bool(exif.get('DateTimeOriginal')) or bool(exif.get('DateTime'))

def has_software(exif):
    """Check if EXIF contains Software tag."""
    return bool(exif.get('Software'))

def find_images(root_folder, extensions=('jpg', 'jpeg', 'png')):
    """Recursively find image files with given extensions."""
    images = []
    for dirpath, _, filenames in os.walk(root_folder):
        for filename in filenames:
            if filename.lower().endswith(extensions):
                images.append(os.path.join(dirpath, filename))
    return images

images = find_images(FILTERED_ARCHIVES_DIR)

counts = {
    "Total Images": len(images),
    "With GPS Location (Lat/Lon)": 0,
    "With Device Info": 0,
    "With Time Taken": 0,
    "With Software": 0
}

for img_path in images:
    exif = get_exif_data(img_path)
    if has_location(exif):
        counts["With GPS Location (Lat/Lon)"] += 1
    if has_device_info(exif):
        counts["With Device Info"] += 1
    if has_time_taken(exif):
        counts["With Time Taken"] += 1
    if has_software(exif):
        counts["With Software"] += 1

# Display the result as a DataFrame
df_summary = pd.DataFrame(list(counts.items()), columns=["Metadata Type", "Count"])
df_summary

In [None]:
def stringify_exif_data(exif_data):
    """Recursively convert all EXIF data values to strings for serialization."""
    if isinstance(exif_data, dict):
        return {str(k): stringify_exif_data(v) for k, v in exif_data.items()}
    elif isinstance(exif_data, (list, tuple)):
        return [stringify_exif_data(v) for v in exif_data]
    else:
        return str(exif_data)

def get_full_exif_data(image_path):
    """Extract all EXIF data from the image, with all values stringified."""
    try:
        image = Image.open(image_path)
        exif_data = image._getexif()
        if not exif_data:
            return {"error": "No EXIF data found"}
        exif = {}
        for tag_id, value in exif_data.items():
            tag = TAGS.get(tag_id, tag_id)
            if tag == "GPSInfo":
                gps_data = {}
                for t in value:
                    sub_tag = GPSTAGS.get(t, t)
                    gps_data[sub_tag] = value[t]
                exif[tag] = gps_data
            else:
                exif[tag] = value
        return stringify_exif_data(exif)
    except Exception as e:
        return {"error": str(e)}

def find_images(root_folder, extensions=('jpg', 'jpeg', 'png')):
    """Recursively find image files with given extensions."""
    images = []
    for dirpath, _, filenames in os.walk(root_folder):
        for filename in filenames:
            if filename.lower().endswith(extensions):
                images.append(os.path.join(dirpath, filename))
    return images

output_json_path = "data/exif.json"

# Step 1: Find all images
images = find_images(FILTERED_ARCHIVES_DIR)

# Step 2: Collect all EXIF data
exif_collection = {}

for image_path in images:
    rel_path = os.path.relpath(image_path, FILTERED_ARCHIVES_DIR)
    exif_data = get_full_exif_data(image_path)
    exif_collection[rel_path] = exif_data

# Step 3: Save all EXIF data into a single JSON file
with open(output_json_path, "w", encoding="utf-8") as f:
    json.dump(exif_collection, f, indent=4, ensure_ascii=False)

print(f"✅ All EXIF data saved to: {output_json_path}")

## Filter out data that is imported into the latex document

In [None]:
FILE_EXTENSIONS_TO_REMOVE = ['.pdf', '.png', '.jpg', '.jpeg', '.gif']
INCLUDEGRAPHICS_PATTERN = re.compile(r'\\includegraphics(?:\[[^\]]*\])?\{([^}]+)\}')
INCLUDEPDF_PATTERN = re.compile(r'\\includepdf(?:\[[^\]]*\])?\{([^}]+)\}')

def extract_referenced_filenames(tex_content):
    """
    Extract referenced filenames (without extensions) from non-commented lines of tex content
    """
    referenced = set()
    for line in tex_content.splitlines():
        line = line.strip()
        if line.startswith('%'):
            continue
        # Remove inline comments
        line = line.split('%')[0]
        referenced.update(INCLUDEGRAPHICS_PATTERN.findall(line))
        referenced.update(INCLUDEPDF_PATTERN.findall(line))
    return {os.path.splitext(os.path.basename(m))[0].lower() for m in referenced}

def process_gz_file(file_path, output_folder):
    original_name = os.path.basename(file_path)
    base_name, ext = os.path.splitext(original_name)

    with tempfile.TemporaryDirectory() as tempdir:
        # === Try tar.gz processing ===
        try:
            with gzip.open(file_path, 'rb') as f_in:
                with tarfile.open(fileobj=f_in, mode='r:*') as tar:
                    tar.extractall(tempdir, filter=None)

            # Tar.gz: extract references from tex files
            referenced_files = set()
            for root, _, files in os.walk(tempdir):
                for f in files:
                    if f.endswith('.tex'):
                        tex_path = os.path.join(root, f)
                        try:
                            with open(tex_path, 'r', encoding='utf-8', errors='ignore') as tf:
                                content = tf.read()
                                referenced_files.update(extract_referenced_filenames(content))
                        except Exception as e:
                            print(f"Error reading {tex_path}: {e}")

            # Remove referenced images and pdfs
            for root, _, files in os.walk(tempdir):
                for f in files:
                    name, ext = os.path.splitext(f)
                    if ext.lower() in FILE_EXTENSIONS_TO_REMOVE:
                        if name.lower() in referenced_files:
                            try:
                                os.remove(os.path.join(root, f))
                            except Exception as e:
                                print(f"Error deleting {f}: {e}")

            # Repack to .tar.gz
            new_name = base_name + "_filtered.tar.gz"
            output_path = os.path.join(output_folder, new_name)
            with tarfile.open(output_path, "w:gz") as tar:
                tar.add(tempdir, arcname=".")

            return output_path

        except (tarfile.ReadError, OSError):
            # === Plain .gz file ===
            try:
                with gzip.open(file_path, 'rb') as f_in:
                    decompressed_data = f_in.read()
            except Exception as e:
                raise ValueError(f"{original_name} could not be decompressed: {e}")

            # Determine output name
            if ext == '':
                # No extension — assume .tex
                new_name = base_name + "_filtered.tex.gz"
            else:
                # Preserve extension
                new_name = base_name + "_filtered" + ext + ".gz"

            output_path = os.path.join(output_folder, new_name)

            with gzip.open(output_path, 'wb') as f_out:
                f_out.write(decompressed_data)

            return output_path


os.makedirs(CLEANED_ARCHIVES_DIR, exist_ok=True)
all_files = [f for f in os.listdir(FILTERED_ARCHIVES_DIR) if os.path.isfile(os.path.join(FILTERED_ARCHIVES_DIR, f))]

for filename in tqdm(all_files, desc="Processing files", unit="file"):
        filepath = os.path.join(FILTERED_ARCHIVES_DIR, filename)

        if filename.lower().endswith('.pdf'):
            continue  # Skip PDFs
        elif filename.lower().endswith('.gz'):
            try:
                process_gz_file(filepath, CLEANED_ARCHIVES_DIR)
            except Exception as e:
                print(f"\nFailed to process {filename}: {e}")
        else:
            continue

In [None]:
interesting_files_folder = 'data/interesting_files'
interesting_extensions = {
    ".xml", ".txt", ".csv", ".out", ".json", ".md", ".dat",
    ".py", ".yaml", ".bak", ".docx", ".ipynb", ".stdout",
    ".strings", ".yml", ".db", ".tmp", ".html", ".readme"
}


def is_interesting(file_name):
    name_lower = file_name.lower()
    base_name, ext = os.path.splitext(name_lower)
    return ext in interesting_extensions

def make_flat_filename(rel_path):
    # Replace OS-specific path separators with underscores
    return rel_path.replace(os.sep, "_")

def copy_flat_files(src_root, dst_root):
    os.makedirs(dst_root, exist_ok=True)

    for dirpath, _, filenames in os.walk(src_root):
        for file in filenames:
            if is_interesting(file):
                src_file_path = os.path.join(dirpath, file)
                rel_path = os.path.relpath(src_file_path, src_root)
                flat_filename = make_flat_filename(rel_path)

                dst_file_path = os.path.join(dst_root, flat_filename)

                # Avoid overwriting if duplicate paths produce same flat name
                counter = 1
                while os.path.exists(dst_file_path):
                    name, ext = os.path.splitext(flat_filename)
                    dst_file_path = os.path.join(dst_root, f"{name}_{counter}{ext}")
                    counter += 1

                try:
                    shutil.copy2(src_file_path, dst_file_path)
                    print(f"Copied: {rel_path} ➜ {os.path.basename(dst_file_path)}")
                except Exception as e:
                    print(f"❌ Failed to copy {rel_path}: {e}")

# Run it
copy_flat_files(CLEANED_ARCHIVES_DIR, interesting_files_folder)

print("✅ Done copying interesting files to flat structure")

manual and LLM analysis from here ...