In [None]:
import os

from clip_retrieval.clip_inference.reader import FilesReader, WebdatasetReader
from clip_retrieval.clip_inference.runner import Sampler

os.environ["CUDA_VISIBLE_DEVICES"] = ""
from clip_retrieval.clip_inference.load_clip import load_clip

images = "test_images"
tars = "test_tars"
folder = images

batch_size = 2
num_prepro_workers = 2
_, preprocess = load_clip()

In [None]:
output_partition_count = 3
for output_partition_id in range(output_partition_count):
    print("output_partition_id", output_partition_id)
    sampler = Sampler(output_partition_id=output_partition_id, output_partition_count=output_partition_count)
    reader = FilesReader(
        sampler,
        preprocess,
        folder,
        batch_size,
        num_prepro_workers,
        enable_text=False,
        enable_image=True,
        enable_metadata=False,
    )
    for i in reader:
        print("hi")
        print(i["image_filename"])
        print(i["image_tensor"].shape)

In [None]:
# turn this into a test

In [None]:
output_partition_count = 2
tars = ["test_tars/image1.tar", "test_tars/image2.tar"]
for output_partition_id in range(output_partition_count):
    print("output_partition_id", output_partition_id)
    sampler = Sampler(output_partition_id=output_partition_id, output_partition_count=output_partition_count)
    reader = WebdatasetReader(
        sampler,
        preprocess,
        tars,
        batch_size,
        num_prepro_workers,
        enable_text=False,
        enable_image=True,
        enable_metadata=False,
    )
    for i in reader:
        print("hi")
        print(i["image_filename"])
        print(i["image_tensor"].shape)

In [None]:
output_partition_count = 2
batch_size = 256
num_prepro_workers = 8
from braceexpand import braceexpand
from tqdm import tqdm

tars = [i for i in braceexpand("pipe:aws s3 cp s3://laion-us-east-1/laion-data/laion2B-data/{000000..231348}.tar -")]
for output_partition_id in range(output_partition_count):
    print("output_partition_id", output_partition_id)
    sampler = Sampler(output_partition_id=output_partition_id, output_partition_count=output_partition_count)
    reader = WebdatasetReader(
        sampler,
        preprocess,
        tars,
        batch_size,
        num_prepro_workers,
        enable_text=False,
        enable_image=True,
        enable_metadata=False,
    )
    for i in tqdm(reader):
        pass
        # print("hi")
        # print(i['image_filename'])
        # print(i['image_tensor'].shape)
        # break

In [None]:
# turn this into a test as well

In [None]:
# next save the tensors, and test the mapper

In [None]:
import pickle

output_partition_count = 1
batch_size = 2
tars = ["test_tars/image1.tar", "test_tars/image2.tar"]
for output_partition_id in range(output_partition_count):
    print("output_partition_id", output_partition_id)
    sampler = Sampler(output_partition_id=output_partition_id, output_partition_count=output_partition_count)
    reader = WebdatasetReader(
        sampler,
        preprocess,
        tars,
        batch_size,
        num_prepro_workers,
        enable_text=False,
        enable_image=True,
        enable_metadata=False,
    )
    for i, sample in enumerate(reader):
        with open("test_tensors/{}.pkl".format(i), "wb") as f:
            pickle.dump(sample, f)

In [None]:
import pickle

from clip_retrieval.clip_inference.mapper import ClipMapper

mapper = ClipMapper(
    enable_image=True,
    enable_text=False,
    enable_metadata=False,
    use_mclip=False,
    device="cpu",
    clip_model="ViT-B/32",
    use_jit=True,
    mclip_model="",
)
tensor_files = [i for i in os.listdir("test_tensors")]
for tensor_file in tensor_files:
    with open("test_tensors/{}".format(tensor_file), "rb") as f:
        tensor = pickle.load(f)
        sample = mapper(tensor)
        assert sample["image_embs"].shape[0] == tensor["image_tensor"].shape[0]
        with open("test_embeddings/{}".format(tensor_file), "wb") as f:
            pickle.dump(sample, f)

In [None]:
# next save the predictions, and test the writter

In [None]:
import pickle
import tempfile

import numpy as np

from clip_retrieval.clip_inference.writer import NumpyWriter

with tempfile.TemporaryDirectory() as tmpdir:
    writer = NumpyWriter(
        partition_id=0,
        output_folder=tmpdir,
        enable_text=False,
        enable_image=True,
        enable_metadata=False,
        write_batch_size=10,
    )
    embedding_files = [i for i in os.listdir("test_embeddings")]
    expected_shape = 0
    for embedding_file in embedding_files:
        with open("test_embeddings/{}".format(embedding_file), "rb") as f:
            embedding = pickle.load(f)
            expected_shape += embedding["image_embs"].shape[0]
            writer(embedding)
    writer.flush()

    with open(tmpdir + "/img_emb/img_emb_0.npy", "rb") as f:
        image_embs = np.load(f)
        assert image_embs.shape[0] == expected_shape

In [None]:
# next do a runner test

In [None]:
import os

import numpy as np

from clip_retrieval.clip_inference.load_clip import load_clip
from clip_retrieval.clip_inference.mapper import ClipMapper
from clip_retrieval.clip_inference.reader import FilesReader, WebdatasetReader
from clip_retrieval.clip_inference.runner import Runner, Sampler
from clip_retrieval.clip_inference.writer import NumpyWriter

os.environ["CUDA_VISIBLE_DEVICES"] = ""
import tempfile

output_partition_count = 2
num_prepro_workers = 8
batch_size = 2
folder = "test_images"


with tempfile.TemporaryDirectory() as tmpdir:

    def reader_builder(sampler):
        _, preprocess = load_clip()
        return FilesReader(
            sampler,
            preprocess,
            folder,
            batch_size,
            num_prepro_workers,
            enable_text=False,
            enable_image=True,
            enable_metadata=False,
        )

    def mapper_builder():
        return ClipMapper(
            enable_image=True,
            enable_text=False,
            enable_metadata=False,
            use_mclip=False,
            device="cpu",
            clip_model="ViT-B/32",
            use_jit=True,
            mclip_model="",
        )

    def writer_builder(i):
        return NumpyWriter(
            partition_id=i,
            output_folder=tmpdir,
            enable_text=False,
            enable_image=True,
            enable_metadata=False,
            write_batch_size=10,
        )

    runner = Runner(
        reader_builder=reader_builder,
        mapper_builder=mapper_builder,
        writer_builder=writer_builder,
        output_partition_count=output_partition_count,
    )

    runner(0)

    with open(tmpdir + "/img_emb/img_emb_0.npy", "rb") as f:
        image_embs = np.load(f)
        assert image_embs.shape[0] == 4

In [None]:
# next do a standalone distributor test

In [None]:
import os

import numpy as np

from clip_retrieval.clip_inference.distributor import PysparkDistributor, SequentialDistributor
from clip_retrieval.clip_inference.load_clip import load_clip
from clip_retrieval.clip_inference.mapper import ClipMapper
from clip_retrieval.clip_inference.reader import FilesReader, WebdatasetReader
from clip_retrieval.clip_inference.runner import Runner, Sampler
from clip_retrieval.clip_inference.writer import NumpyWriter

os.environ["CUDA_VISIBLE_DEVICES"] = ""
import tempfile

output_partition_count = 2
num_prepro_workers = 8
batch_size = 2
folder = "test_images"
distributor_kind = "pyspark"


with tempfile.TemporaryDirectory() as tmpdir:

    def reader_builder(sampler):
        _, preprocess = load_clip()
        return FilesReader(
            sampler,
            preprocess,
            folder,
            batch_size,
            num_prepro_workers,
            enable_text=False,
            enable_image=True,
            enable_metadata=False,
        )

    def mapper_builder():
        return ClipMapper(
            enable_image=True,
            enable_text=False,
            enable_metadata=False,
            use_mclip=False,
            device="cpu",
            clip_model="ViT-B/32",
            use_jit=True,
            mclip_model="",
        )

    def writer_builder(i):
        return NumpyWriter(
            partition_id=i,
            output_folder=tmpdir,
            enable_text=False,
            enable_image=True,
            enable_metadata=False,
            write_batch_size=10,
        )

    runner = Runner(
        reader_builder=reader_builder,
        mapper_builder=mapper_builder,
        writer_builder=writer_builder,
        output_partition_count=output_partition_count,
    )

    if distributor_kind == "sequential":
        distributor = SequentialDistributor(runner, output_partition_count)
    elif distributor_kind == "pyspark":
        from pyspark.sql import SparkSession  # pylint: disable=import-outside-toplevel

        spark = (
            SparkSession.builder.config("spark.driver.memory", "16G")
            .master("local[" + str(2) + "]")
            .appName("spark-stats")
            .getOrCreate()
        )
        distributor = PysparkDistributor(runner, output_partition_count)
    distributor()

    with open(tmpdir + "/img_emb/img_emb_0.npy", "rb") as f:
        image_embs = np.load(f)
        assert image_embs.shape[0] == 4
    with open(tmpdir + "/img_emb/img_emb_1.npy", "rb") as f:
        image_embs = np.load(f)
        assert image_embs.shape[0] == 3

In [None]:
# next to an end to end main test

In [None]:
import os

import numpy as np

os.environ["CUDA_VISIBLE_DEVICES"] = ""
import tempfile

from clip_retrieval.clip_inference.main import main

num_prepro_workers = 8
batch_size = 2
input_dataset = "test_images"
distributor_kind = "pyspark"


with tempfile.TemporaryDirectory() as tmpdir:
    from pyspark.sql import SparkSession  # pylint: disable=import-outside-toplevel

    spark = (
        SparkSession.builder.config("spark.driver.memory", "16G")
        .master("local[" + str(2) + "]")
        .appName("spark-stats")
        .getOrCreate()
    )

    main(
        input_dataset,
        output_folder=tmpdir,
        input_format="files",
        cache_path=None,
        batch_size=256,
        num_prepro_workers=8,
        enable_text=False,
        enable_image=True,
        enable_metadata=False,
        write_batch_size=4,
        wds_image_key="jpg",
        wds_caption_key="txt",
        clip_model="ViT-B/32",
        mclip_model="sentence-transformers/clip-ViT-B-32-multilingual-v1",
        use_mclip=False,
        use_jit=True,
        distribution_strategy="pyspark",
        wds_number_file_per_input_file=10000,
        output_partition_count=None,
    )

    with open(tmpdir + "/img_emb/img_emb_0.npy", "rb") as f:
        image_embs = np.load(f)
        assert image_embs.shape[0] == 4
    with open(tmpdir + "/img_emb/img_emb_1.npy", "rb") as f:
        image_embs = np.load(f)
        assert image_embs.shape[0] == 3

In [None]:
# next test with real data

In [None]:
# done

In [None]:
# test the logger writer and the logger reader here then in a test

In [None]:
from clip_retrieval.clip_inference.logger import LoggerWriter

logger = LoggerWriter(partition_id=0, stats_folder="/tmp/my_stats")
logger.start()

In [None]:
for i in range(10000):
    logger(
        {
            "read_duration": 0.5,
            "inference_duration": 10,
            "write_duration": 2,
            "total_duration": 13,
            "sample_count": 1024,
        }
    )

In [None]:
logger.end()

In [None]:
from clip_retrieval.clip_inference.logger import LoggerReader

In [None]:
reader = LoggerReader(stats_folder="/tmp/my_stats", enable_wandb=True)
reader.start()

In [None]:
import pandas as pd

a = pd.read_parquet("/tmp/my_stats/stats.parquet")
a

In [None]:
# then benchmark it

In [None]:
# locally done

In [None]:
# then do the doc for pyspark with gpu setup

In [None]:
# then do the real benchmark

In [None]:
# then done