# Vision and Cognition Systems - Project


In [1]:
#libraries to import
#known
import pandas as pd
import numpy as np
import os
import re
import torchvision
import torch
import PIL
from PIL import Image
from PIL import ImageFile
import sys
import time

#Unknown
from typing import Union
#from io import BytesIO
import random
from pathlib import Path
from multiprocessing import Pool
from functools import partial
import requests
import logging
import json
import yaml

This is from ```msgpack_viewer.py```

In [None]:
class MsgPackIterableDataset(torch.utils.data.IterableDataset):

    def __init__(
        self,
        path: str,
        key_img_id: str = "id",
        key_img_encoded: str = "image",
        transformation=None,
        shuffle=False,
        cache_size=6 * 4096,
    ):

        super(MsgPackIterableDataset, self).__init__()
        self.path = path
        self.cache_size = cache_size
        self.transformation = transformation
        self.shuffle = shuffle
        self.seed = random.randint(1, 100)
        self.key_img_id = key_img_id.encode("utf-8")
        self.key_img_encoded = key_img_encoded.encode("utf-8")

        if not isinstance(self.path, (list, set)):
            self.path = [self.path]
        
        self.shards = self.__init_shards(self.path)

    @staticmethod
    def __init_shards(path: Union[str, Path]) -> list:
        shards = []
        for i, p in enumerate(path):
            shards_re = r"shard_(\d+).msg"
            shards_index = [
                int(re.match(shards_re, x).group(1))
                for x in os.listdir(p)
                if re.match(shards_re, x)
            ]
            shards.extend(
                [
                    {
                        "path_index": i,
                        "path": p,
                        "shard_index": s,
                        "shard_path": os.path.join(p, f"shard_{s}.msg"),
                    }
                    for s in shards_index
                ]
            )
        if len(shards) == 0:
            raise ValueError("No shards found")
        
        return shards

    def _process_sample(self, x):
        # decode and initial resize if necessary
        img = Image.open(BytesIO(x[self.key_img_encoded]))
        if img.mode != "RGB":
            img = img.convert("RGB")

        if img.width > 320 and img.height > 320:
            img = torchvision.transforms.Resize(320)(img)

        # apply all user specified image transformations
        if self.transformation is not None:
            img = self.transformation(img)
        
        _id = x[self.key_img_id].decode("utf-8")
        return img, _id

    def __iter__(self):

        shard_indices = list(range(len(self.shards)))

        if self.shuffle:
            random.seed(self.seed)
            random.shuffle(shard_indices)

        worker_info = torch.utils.data.get_worker_info()

        if worker_info is not None:

            def split_list(alist, splits=1):
                length = len(alist)
                return [
                    alist[i * length // splits : (i + 1) * length // splits]
                    for i in range(splits)
                ]

            shard_indices_split = split_list(shard_indices, worker_info.num_workers)[
                worker_info.id
            ]

        else:
            shard_indices_split = shard_indices

        cache = []

        for shard_index in shard_indices_split:
            shard = self.shards[shard_index]

            with open(
                os.path.join(shard["path"], f"shard_{shard['shard_index']}.msg"), "rb"
            ) as f:
                unpacker = msgpack.Unpacker(
                    f, max_buffer_size=1024 * 1024 * 1024, raw=True
                )
                for x in unpacker:
                    if x is None:
                        continue

                    if len(cache) < self.cache_size:
                        cache.append(x)

                    if len(cache) == self.cache_size:

                        if self.shuffle:
                            random.shuffle(cache)
                        while cache:
                            yield self._process_sample(cache.pop())
        if self.shuffle:
            random.shuffle(cache)

        while cache:
            yield self._process_sample(cache.pop())



In [None]:
if __name__ == "__main__":

    args = argparse.ArgumentParser()
    args.add_argument("--data", type=str, default="resources/images/mp16")
    args = args.parse_args()

    tfm = torchvision.transforms.Compose(
        [
            torchvision.transforms.ToTensor(),
        ]
    )

    dataset = MsgPackIterableDataset(path=args.data, transformation=tfm)
    dataloader = torch.utils.data.DataLoader(
            dataset,
            batch_size=1,
            num_workers=6,
            pin_memory=False,
        )

    num_images = 0
    for x, image_id in dataloader:
        if num_images == 0:
            print(x.shape, image_id)
        num_images +=1
    
    print(f"{num_images=}")


This is from ```filter_by_downloaded_images.py``` that maybe we won't need since we will use a different dataset.

In [None]:
def main():

    for dataset_type in ["train", "val"]:
        with open(config[f"{dataset_type}_label_mapping"]) as f:
            mapping = json.load(f)

        logging.info(f"Expected dataset size: {len(mapping)}")
        msgpack_path = config[f"msgpack_{dataset_type}_dir"]
        image_ids_path = config[f"{dataset_type}_meta_path"]
        dataset = MsgPackIterableMetaDataset(
            msgpack_path,
            image_ids_path,
            image_ids_path,
            key_img_id=config["key_img_id"],
            key_img_encoded=config["key_img_encoded"],
            ignore_image=True,
        )

        filtered_mapping = {}
        for _, meta in dataset:
            if meta["img_id"] in mapping:
                filtered_mapping[meta["img_id"]] = mapping[meta["img_id"]]
        logging.info(f"True dataset size: {len(filtered_mapping)}")

        with open(config[f"{dataset_type}_label_mapping"], "w") as fw:
            json.dump(filtered_mapping, fw)
    return


def parse_args():
    parser = ArgumentParser()
    parser.add_argument("-c", "--config", type=Path, default="config/baseM.yml")
    args = parser.parse_args()
    return args


if __name__ == "__main__":

    logging.basicConfig(
        format="%(asctime)s %(levelname)s: %(message)s",
        datefmt="%d-%m-%Y %H:%M:%S",
        level=logging.INFO,
    )

    args = parse_args()

    with open(args.config) as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    config = config["model_params"]

    main()


Finally this is from ```download_images.py```.

In [None]:
ImageFile.LOAD_TRUNCATED_IMAGES = True


class MsgPackWriter:
    def __init__(self, path, chunk_size=4096):
        self.path = Path(path).absolute()
        self.path.mkdir(parents=True, exist_ok=True)
        self.chunk_size = chunk_size

        shards_re = r"shard_(\d+).msg"
        self.shards_index = [
            int(re.match(shards_re, x).group(1))
            for x in self.path.iterdir()
            if x.is_dir() and re.match(shards_re, x)
        ]
        self.shard_open = None

    def open_next(self):
        if len(self.shards_index) == 0:
            next_index = 0
        else:
            next_index = sorted(self.shards_index)[-1] + 1
        self.shards_index.append(next_index)

        if self.shard_open is not None and not self.shard_open.closed:
            self.shard_open.close()

        self.count = 0
        self.shard_open = open(self.path / f"shard_{next_index}.msg", "wb")

    def __enter__(self):
        self.open_next()
        return self

    def __exit__(self, type, value, tb):
        self.shard_open.close()

    def write(self, data):
        if self.count >= self.chunk_size:
            self.open_next()

        self.shard_open.write(msgpack.packb(data))
        self.count += 1


def _thumbnail(img: PIL.Image, size: int) -> PIL.Image:
    # resize an image maintaining the aspect ratio
    # the smaller edge of the image will be matched to 'size'
    w, h = img.size
    if (w <= size) or (h <= size):
        return img
    if w < h:
        ow = size
        oh = int(size * h / w)
        return img.resize((ow, oh), PIL.Image.BILINEAR)
    else:
        oh = size
        ow = int(size * w / h)
        return img.resize((ow, oh), PIL.Image.BILINEAR)


def flickr_download(x, size_suffix="z", min_edge_size=None):

    # prevent downloading in full resolution using size_suffix
    # https://www.flickr.com/services/api/misc.urls.html

    image_id = x["image_id"]
    url_original = x["url"]
    if size_suffix != "":
        url = url_original
        # modify url to download image with specific size
        ext = Path(url).suffix
        url = f"{url.split(ext)[0]}_{size_suffix}{ext}"
    else:
        url = url_original

    r = requests.get(url)
    if r:
        try:
            image = PIL.Image.open(BytesIO(r.content))
        except PIL.UnidentifiedImageError as e:
            logger.error(f"{image_id} : {url}: {e}")
            return
    elif r.status_code == 129:
        time.sleep(60)
        logger.warning("To many requests, sleep for 60s...")
        flickr_download(x, min_edge_size=min_edge_size, size_suffix=size_suffix)
    else:
        logger.error(f"{image_id} : {url}: {r.status_code}")
        return None

    if image.mode != "RGB":
        image = image.convert("RGB")

    # resize if necessary
    image = _thumbnail(image, min_edge_size)
    # convert to jpeg
    fp = BytesIO()
    image.save(fp, "JPEG")

    raw_bytes = fp.getvalue()
    return {"image": raw_bytes, "id": image_id}


class ImageDataloader:
    def __init__(self, url_csv: Path, shuffle=False, nrows=None):

        logger.info("Read dataset")
        self.df = pd.read_csv(
            url_csv, names=["image_id", "url"], header=None, nrows=nrows
        )
        # remove rows without url
        self.df = self.df.dropna()
        if shuffle:
            logger.info("Shuffle images")
            self.df = self.df.sample(frac=1, random_state=10)
        logger.info(f"Number of URLs: {len(self.df.index)}")

    def __len__(self):
        return len(self.df.index)

    def __iter__(self):
        for image_id, url in zip(self.df["image_id"].values, self.df["url"].values):
            yield {"image_id": image_id, "url": url}


def parse_args():
    args = ArgumentParser()
    args.add_argument(
        "--threads",
        type=int,
        default=24,
        help="Number of threads to download and process images",
    )
    args.add_argument(
        "--output",
        type=Path,
        default=Path("resources/images/mp16"),
        help="Output directory where images are stored",
    )
    args.add_argument(
        "--url_csv",
        type=Path,
        default=Path("resources/mp16_urls.csv"),
        help="CSV with Flickr image id and URL for downloading",
    )
    args.add_argument(
        "--size",
        type=int,
        default=320,
        help="Rescale image to a minimum edge size of SIZE",
    )
    args.add_argument(
        "--size_suffix",
        type=str,
        default="z",
        help="Image size suffix according to the Flickr API; Empty string for original image",
    )
    args.add_argument("--nrows", type=int)
    args.add_argument(
        "--shuffle", action="store_true", help="Shuffle list of URLs before downloading"
    )
    return args.parse_args()


def main():

    image_loader = ImageDataloader(args.url_csv, nrows=args.nrows, shuffle=args.shuffle)

    counter_successful = 0
    with Pool(args.threads) as p:
        with MsgPackWriter(args.output) as f:
            start = time.time()
            for i, x in enumerate(
                p.imap(
                    partial(
                        flickr_download,
                        size_suffix=args.size_suffix,
                        min_edge_size=args.size,
                    ),
                    image_loader,
                )
            ):
                if x is None:
                    continue

                f.write(x)
                counter_successful += 1

                if i % 1000 == 0:
                    end = time.time()
                    logger.info(f"{i}: {1000 / (end - start):.2f} image/s")
                    start = end
    logger.info(
        f"Sucesfully downloaded {counter_successful}/{len(image_loader)} images ({counter_successful / len(image_loader):.3f})"
    )
    return 0


if __name__ == "__main__":
    args = parse_args()
    args.output.mkdir(parents=True, exist_ok=True)

    logger = logging.getLogger("ImageDownloader")
    logger.setLevel(logging.INFO)
    fh = logging.FileHandler(str(args.output / "writer.log"))
    fh.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)

    sys.exit(main())


Now following the github page we need:
- item 1
- item 2
- item 3
- item 4 

The main link and paper that we need to follow is [this](https://github.com/TIBHannover/GeoEstimation) and [this](https://github.com/TIBHannover/GeoEstimation/releases/) for the pretrained models.

Davide ha trovato questo che forse è meglio [kaggle](https://www.kaggle.com/code/habedi/inspect-the-dataset/data)

Qui ci sono dei links che potrebbero essere usati con colab col comando [!wget](https://qualinet.github.io/databases/image/world_wide_scale_geotagged_image_dataset_for_automatic_image_annotation_and_reverse_geotagging/)