In [1]:
import io
import os
from typing import Literal, TypeAlias

import megfile
import numpy as np
import PIL.Image
import PIL.ImageOps
import requests
import torch
import functools
import torch.distributed as dist

# from .loguru import logger

"""
- pil: `PIL.Image.Image`, size (w, h), seamless conversion between `uint8`
- np: `np.ndarray`, shape (h, w, c), default `np.uint8`
- pt: `torch.Tensor`, shape (c, h, w), default `torch.uint8`
"""
ImageType: TypeAlias = PIL.Image.Image | np.ndarray | torch.Tensor
ImageTypeStr: TypeAlias = Literal["pil", "np", "pt"]
ImageFormat: TypeAlias = Literal["JPEG", "PNG"]
DataFormat: TypeAlias = Literal["255", "01", "11"]


IMAGE_EXT_LOWER = ["png", "jpeg", "jpg"]
IMAGE_EXT = IMAGE_EXT_LOWER + [_ext.upper() for _ext in IMAGE_EXT_LOWER]


def check_image_type(image: ImageType):
    if not (isinstance(image, PIL.Image.Image) or isinstance(image, np.ndarray) or isinstance(image, torch.Tensor)):
        raise TypeError(f"`image` should be PIL Image, ndarray or Tensor. Got `{type(image)}`.")


def load_image(
    image: str | os.PathLike | PIL.Image.Image,
    *,
    output_type: ImageTypeStr = "pil",
) -> ImageType:
    """
    Loads `image` to a PIL Image, NumPy array or PyTorch tensor.

    Args:
        image (str | PIL.Image.Image): The path to image or PIL Image.
        mode (ImageMode, optional): The mode to convert to. Defaults to None (no conversion).
            The current version supports all possible conversions between "L", "RGB", "RGBA".
        output_type (ImageTypeStr, optional): The type of the output image. Defaults to "pil".
            The current version supports "pil", "np", "pt".

    Returns:
        ImageType: The loaded image in the given type.
    """
    timeout = 10
    # Load the `image` into a PIL Image.
    if isinstance(image, str) or isinstance(image, os.PathLike):
        if image.startswith("http://") or image.startswith("https://"):
            try:
                image = PIL.Image.open(requests.get(image, stream=True, timeout=timeout).raw)
            except requests.exceptions.Timeout:
                raise ValueError(f"HTTP request timed out after {timeout} seconds")
        elif image.startswith("s3"):
            with megfile.smart_open(image, "rb") as f:
                bytes_data = f.read()
            image = PIL.Image.open(io.BytesIO(bytes_data), "r")
        elif os.path.isfile(image):
            image = PIL.Image.open(image)
        else:
            raise ValueError(f"Incorrect path or url, URLs must start with `http://`, `https://` or `s3+[profile]://`, and `{image}` is not a valid path.")
    elif isinstance(image, PIL.Image.Image):
        image = image
    else:
        raise ValueError(f"`image` must be a path or PIL Image, got `{type(image)}`")

    # Automatically adjust the orientation of the image to match the direction it was taken.
    image = PIL.ImageOps.exif_transpose(image)

    support_mode = ["L", "RGB", "RGBA", "CMYK"]
    if image.mode not in support_mode:
        raise ValueError(f"Only support mode in `{support_mode}`, got `{image.mode}`")

    # add white background for RGBA images, and convert to RGB
    if image.mode == "RGBA":
        background = PIL.Image.new("RGBA", image.size, "white")
        image = PIL.Image.alpha_composite(background, image).convert("RGB")

    image = image.convert("RGB")

    if output_type == "pil":
        image = image
    elif output_type == "np":
        image = to_np(image)
    elif output_type == "pt":
        image = to_pt(image)
    else:
        raise ValueError(f"`output_type` must be one of `{ImageTypeStr}`, got `{output_type}`")

    return image


@functools.lru_cache()
def _get_global_gloo_group():
    """
    Return a process group based on gloo backend, containing all the ranks
    The result is cached.
    """
    if dist.get_backend() == "nccl":
        return dist.new_group(backend="gloo")
    else:
        return dist.group.WORLD


def all_gather(data, group=None):
    """
    Run all_gather on arbitrary picklable data (not necessarily tensors).

    Args:
        data: any picklable object
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.

    Returns:
        list[data]: list of data gathered from each rank
    """
    if group is None:
        group = _get_global_gloo_group()  # use CPU group by default, to reduce GPU RAM usage.
    world_size = dist.get_world_size(group)
    if world_size == 1:
        return [data]

    device = data.device
    output = [None for _ in range(world_size)]
    dist.all_gather_object(output, data, group=group)
    output = [o.to(device) for o in output]
    return output


def initialize_distributed_backend(backend="nccl", init_method="env://"):
    """
    Initializes the default distributed process group.
    Args:
        backend (str): Backend to use (nccl, gloo).
        init_method (str): URL specifying how to initialize the process group.
    """
    dist.init_process_group(backend=backend, init_method=init_method)

In [1]:
# 测试样本

import math
import os
from typing import Literal, List
# import fire
# import megfile
import torch
import PIL.Image
import numpy as np
from tqdm.auto import tqdm
from transformers import CLIPModel, CLIPProcessor
import glob
from typing import Literal, TypeAlias
import json

# 定义常量
_DEFAULT_MODEL = "/mnt/workspace/ziwei/checkpoints/clip-vit-base-patch32"
_DEFAULT_TORCH_DTYPE: torch.dtype = torch.float32
IMAGE_EXT_LOWER = ["png", "jpeg", "jpg"]
IMAGE_EXT = IMAGE_EXT_LOWER + [_ext.upper() for _ext in IMAGE_EXT_LOWER]

"""
- pil: `PIL.Image.Image`, size (w, h), seamless conversion between `uint8`
- np: `np.ndarray`, shape (h, w, c), default `np.uint8`
- pt: `torch.Tensor`, shape (c, h, w), default `torch.uint8`
"""
ImageType: TypeAlias = PIL.Image.Image | np.ndarray | torch.Tensor
ImageTypeStr: TypeAlias = Literal["pil", "np", "pt"]
ImageFormat: TypeAlias = Literal["JPEG", "PNG"]
DataFormat: TypeAlias = Literal["255", "01", "11"]


def load_image(image_path: str, output_type: ImageTypeStr = "pil") -> ImageType:
    image = PIL.Image.open(image_path)
    if output_type == "pil":
        return image
    elif output_type == "np":
        return np.array(image)
    elif output_type == "pt":
        return torch.tensor(np.array(image).transpose(2, 0, 1), dtype=torch.float32)
    else:
        raise ValueError(f"Unsupported output_type: {output_type}")


class CLIPScore:
    def __init__(
        self,
        model_or_name_path: str = _DEFAULT_MODEL,
        torch_dtype: torch.dtype = _DEFAULT_TORCH_DTYPE,
        local_files_only: bool = False,
        device: str | torch.device = "cuda" if torch.cuda.is_available() else "cpu",
    ):
        super().__init__()
        self.device = device
        self.dtype = torch_dtype
        self.model = CLIPModel.from_pretrained(model_or_name_path, torch_dtype=torch_dtype, local_files_only=False).to(device)
        self.model.eval()
        self.processor = CLIPProcessor.from_pretrained(model_or_name_path, local_files_only=False)

    @torch.no_grad()
    def get_text_features(self, text: str | List[str], *, norm: bool = False) -> torch.Tensor:
        if not isinstance(text, list):
            text = [text]
        inputs = self.processor(text=text, padding=True, return_tensors="pt")
        text_features = self.model.get_text_features(
            inputs["input_ids"].to(self.device),
            inputs["attention_mask"].to(self.device),
        )
        if norm:
            text_features = text_features / text_features.norm(p=2, dim=-1, keepdim=True)
        return text_features

    @torch.no_grad()
    def get_image_features(self, image: ImageType | List[ImageType], *, norm: bool = False) -> torch.Tensor:
        if not isinstance(image, list):
            image = [image]
        inputs = self.processor(images=image, return_tensors="pt")
        image_features = self.model.get_image_features(inputs["pixel_values"].to(self.device, dtype=self.dtype))
        if norm:
            image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
        return image_features

    @torch.no_grad()
    def clipi_score(self, images1: ImageType | List[ImageType], images2: ImageType | List[ImageType]) -> tuple[float, int]:
        if not isinstance(images1, list):
            images1 = [images1]
        if not isinstance(images2, list):
            images2 = [images2]
        # print("image1_num", len(images1))
        assert len(images1) == len(images2) or len(images2) == 1, f"Number of images1 ({len(images1)}) and images2 ({len(images2)}) should be same."
        images2_features = self.get_image_features(images2, norm=True)
        score = 0
        if len(images1) > 1:
            for img in images1:
                images1_features = self.get_image_features(img, norm=True)
                # cosine similarity between feature vectors
                score += 100 * (images1_features * images2_features).sum(axis=-1)
                # print(score)
                # print("score:", score.sum(0))
            return score.sum(0).float() / len(images1), len(images1)
        else:
            images1_features = self.get_image_features(images1, norm=True)
            # cosine similarity between feature vectors
            score = 100 * (images1_features * images2_features).sum(axis=-1)
            return score.sum(0).float(), len(images1)

    @torch.no_grad()
    def clipt_score(self, texts: str | List[str], images: ImageType | List[ImageType]) -> tuple[float, int]:
        if not isinstance(texts, list):
            texts = [texts]
        if not isinstance(images, list):
            images = [images]
        assert len(texts) == len(images), f"Number of texts ({len(texts)}) and images ({len(images)}) should be same."
        texts_features = self.get_text_features(texts, norm=True)
        images_features = self.get_image_features(images, norm=True)
        # cosine similarity between feature vectors
        score = 100 * (texts_features * images_features).sum(axis=-1)
        return score.sum(0).float(), len(texts)


def single_clipi_score(image1_paths: List[str], image2_paths: List[str], clip_score: CLIPScore) -> float:
    #print("attention:",len(image1_paths))
    # print(len(image2_paths))
    assert len(image1_paths) == len(image2_paths), f"Number of image1 files {len(image1_paths)} != number of image2 files {len(image2_paths)}."

    total_score = 0.0
    pbar = tqdm(total=len(image1_paths), desc="Evaluating CLIP-I Score")

    for image1_path_list, image2_path in zip(image1_paths, image2_paths):
        image2 = load_image(image2_path)
        image1 = []
        for image1_path in image1_path_list:
            # print("image1_path", image1_path)
            image1.append(load_image(image1_path))

        score, _ = clip_score.clipi_score(image1, image2)
        # print(score)
        total_score += score.item()
        pbar.update(1)
        # print("total_score:", total_score)

    pbar.close()
    return total_score / len(image1_paths)


def single_clipt_score(texts: List[str], image_paths: List[str], clip_score: CLIPScore) -> float:
    assert len(texts) == len(image_paths), f"Number of texts ({len(texts)}) != number of image files {len(image_paths)}."
    total_score = 0.0
    pbar = tqdm(total=len(texts), desc="Evaluating CLIP-T Score")

    for text, image_path in zip(texts, image_paths):
        image = load_image(image_path)
        score, _ = clip_score.clipt_score(text, image)
        total_score += score.item()
        pbar.update(1)

    pbar.close()
    return total_score / len(texts)


def clip_eval(mode: Literal["clipi", "clipt"], dir1: str, dir2: str):
    clip_score = CLIPScore()
    if mode == "clipi":
        image1_paths = glob.glob(os.path.join(dir1, "*"))
        image2_paths = glob.glob(os.path.join(dir2, "*"))
        print(f"CLIP-I Score: {clipi_score(image1_paths, image2_paths, clip_score)}")
    elif mode == "clipt":
        text_files = glob.glob(os.path.join(dir1, "*.txt"))
        image_paths = glob.glob(os.path.join(dir2, "*"))
        texts = []
        for text_file in text_files:
            with open(text_file, "r") as f:
                texts.append(f.read().strip())
        print(f"CLIP-T Score: {clipt_score(texts, image_paths, clip_score)}")


if __name__ == "__main__":
    result_path = "/mnt/workspace/ziwei/KBR_results/msdiffusion/level_one"
    json_files = glob.glob(os.path.join(result_path, "*.json"))
    for json_file in json_files:
        with open(json_file, "r") as reader:
            data_all = json.load(reader)
            result_images = []
            reference_images = []
            texts = []
            for data in data_all:
                result_images.append(data["result_image"])
                reference_images.append(data["reference_image"])
                texts.append(data["text"])
            # print(reference_images)
            # print(result_images)
            clipt_score_value = single_clipt_score(texts, result_images, CLIPScore())
            clipi_score_value = single_clipi_score(reference_images, result_images, CLIPScore())
            print("clipt_score:", clipt_score_value)
            # print("Stop")
            print("clipi_score:", clipi_score_value)x

  from .autonotebook import tqdm as notebook_tqdm
Evaluating CLIP-T Score: 100%|██████████| 200/200 [00:17<00:00, 11.34it/s]
Evaluating CLIP-I Score: 100%|██████████| 200/200 [00:30<00:00,  6.54it/s]


clipt_score: 34.99002919197083
clipi_score: 73.49842475891113


Evaluating CLIP-T Score: 100%|██████████| 100/100 [00:06<00:00, 15.40it/s]
Evaluating CLIP-I Score: 100%|██████████| 100/100 [00:11<00:00,  8.75it/s]


clipt_score: 35.40941041946411
clipi_score: 73.49188335418701


Evaluating CLIP-T Score: 100%|██████████| 100/100 [00:06<00:00, 15.24it/s]
Evaluating CLIP-I Score: 100%|██████████| 100/100 [00:13<00:00,  7.33it/s]


clipt_score: 36.39902774810791
clipi_score: 79.83686378479004


Evaluating CLIP-T Score: 100%|██████████| 100/100 [00:06<00:00, 15.25it/s]
Evaluating CLIP-I Score: 100%|██████████| 100/100 [00:09<00:00, 10.99it/s]


clipt_score: 36.732144584655764
clipi_score: 79.03935253143311


Evaluating CLIP-T Score: 100%|██████████| 150/150 [00:10<00:00, 14.50it/s]
Evaluating CLIP-I Score: 100%|██████████| 150/150 [00:16<00:00,  9.31it/s]


clipt_score: 33.36951310475667
clipi_score: 72.28870056152344


Evaluating CLIP-T Score: 100%|██████████| 150/150 [00:09<00:00, 15.36it/s]
Evaluating CLIP-I Score: 100%|██████████| 150/150 [00:28<00:00,  5.18it/s]

clipt_score: 34.64983894348145
clipi_score: 73.21331253051758





In [14]:
import math
import os
from typing import Literal, List
import fire
import megfile
import torch
import PIL.Image
import numpy as np
from tqdm.auto import tqdm
from transformers import CLIPModel, CLIPProcessor
import glob
from typing import Literal, TypeAlias
import json

# 定义常量
_DEFAULT_MODEL = "/mnt/workspace/ziwei/checkpoints/clip-vit-base-patch32"
_DEFAULT_TORCH_DTYPE: torch.dtype = torch.float32
IMAGE_EXT_LOWER = ["png", "jpeg", "jpg"]
IMAGE_EXT = IMAGE_EXT_LOWER + [_ext.upper() for _ext in IMAGE_EXT_LOWER]

"""
- pil: `PIL.Image.Image`, size (w, h), seamless conversion between `uint8`
- np: `np.ndarray`, shape (h, w, c), default `np.uint8`
- pt: `torch.Tensor`, shape (c, h, w), default `torch.uint8`
"""
ImageType: TypeAlias = PIL.Image.Image | np.ndarray | torch.Tensor
ImageTypeStr: TypeAlias = Literal["pil", "np", "pt"]
ImageFormat: TypeAlias = Literal["JPEG", "PNG"]
DataFormat: TypeAlias = Literal["255", "01", "11"]


def load_image(image_path: str, output_type: ImageTypeStr = "pil") -> ImageType:
    image = PIL.Image.open(image_path)
    if output_type == "pil":
        return image
    elif output_type == "np":
        return np.array(image)
    elif output_type == "pt":
        return torch.tensor(np.array(image).transpose(2, 0, 1), dtype=torch.float32)
    else:
        raise ValueError(f"Unsupported output_type: {output_type}")


class CLIPScore:
    def __init__(
        self,
        model_or_name_path: str = _DEFAULT_MODEL,
        torch_dtype: torch.dtype = _DEFAULT_TORCH_DTYPE,
        local_files_only: bool = False,
        device: str | torch.device = "cuda" if torch.cuda.is_available() else "cpu",
    ):
        super().__init__()
        self.device = device
        self.dtype = torch_dtype
        self.model = CLIPModel.from_pretrained(model_or_name_path, torch_dtype=torch_dtype, local_files_only=False).to(device)
        self.model.eval()
        self.processor = CLIPProcessor.from_pretrained(model_or_name_path, local_files_only=False)

    @torch.no_grad()
    def get_text_features(self, text: str | List[str], *, norm: bool = False) -> torch.Tensor:
        if not isinstance(text, list):
            text = [text]
        inputs = self.processor(text=text, padding=True, return_tensors="pt")
        text_features = self.model.get_text_features(
            inputs["input_ids"].to(self.device),
            inputs["attention_mask"].to(self.device),
        )
        if norm:
            text_features = text_features / text_features.norm(p=2, dim=-1, keepdim=True)
        return text_features

    @torch.no_grad()
    def get_image_features(self, image: ImageType | List[ImageType], *, norm: bool = False) -> torch.Tensor:
        if not isinstance(image, list):
            image = [image]
        inputs = self.processor(images=image, return_tensors="pt")
        image_features = self.model.get_image_features(inputs["pixel_values"].to(self.device, dtype=self.dtype))
        if norm:
            image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
        return image_features

    @torch.no_grad()
    def clipi_score(self, images1: ImageType | List[ImageType], images2: ImageType | List[ImageType]) -> tuple[float, int]:
        if not isinstance(images1, list):
            images1 = [images1]
        if not isinstance(images2, list):
            images2 = [images2]
        assert len(images1) == len(images2), f"Number of images1 ({len(images1)}) and images2 ({len(images2)}) should be same."
        images1_features = self.get_image_features(images1, norm=True)
        images2_features = self.get_image_features(images2, norm=True)
        # cosine similarity between feature vectors
        score = 100 * (images1_features * images2_features).sum(axis=-1)
        return score.sum(0).float(), len(images1)

    @torch.no_grad()
    def clipt_score(self, texts: str | List[str], images: ImageType | List[ImageType]) -> tuple[float, int]:
        if not isinstance(texts, list):
            texts = [texts]
        if not isinstance(images, list):
            images = [images]
        assert len(texts) == len(images), f"Number of texts ({len(texts)}) and images ({len(images)}) should be same."
        texts_features = self.get_text_features(texts, norm=True)
        images_features = self.get_image_features(images, norm=True)
        # cosine similarity between feature vectors
        score = 100 * (texts_features * images_features).sum(axis=-1)
        return score.sum(0).float(), len(texts)

def clipi_score(image1_paths: List[str], image2_paths: List[str], clip_score: CLIPScore) -> float:
    assert len(image1_paths) == len(image2_paths), f"Number of image1 files {len(image1_paths)} != number of image2 files {len(image2_paths)}."
    total_score = 0.0
    pbar = tqdm(total=len(image1_paths), desc="Evaluating CLIP-I Score")

    for image1_path, image2_path in zip(image1_paths, image2_paths):
        image1 = load_image(image1_path)
        image2 = load_image(image2_path)
        score, _ = clip_score.clipi_score(image1, image2)
        total_score += score.item()
        pbar.update(1)

    pbar.close()
    return total_score / len(image1_paths)

def clipt_score(texts: List[str], image_paths: List[str], clip_score: CLIPScore) -> float:
    assert len(texts) == len(image_paths), f"Number of texts ({len(texts)}) != number of image files {len(image_paths)}."
    total_score = 0.0
    pbar = tqdm(total=len(texts), desc="Evaluating CLIP-T Score")

    for text, image_path in zip(texts, image_paths):
        image = load_image(image_path)
        score, _ = clip_score.clipt_score(text, image)
        total_score += score.item()
        pbar.update(1)

    pbar.close()
    return total_score / len(texts)

def clip_eval(mode: Literal["clipi", "clipt"], dir1: str, dir2: str):
    clip_score = CLIPScore()
    if mode == "clipi":
        image1_paths = glob.glob(os.path.join(dir1, '*'))
        image2_paths = glob.glob(os.path.join(dir2, '*'))
        print(f"CLIP-I Score: {clipi_score(image1_paths, image2_paths, clip_score)}")
    elif mode == "clipt":
        text_files = glob.glob(os.path.join(dir1, '*.txt'))
        image_paths = glob.glob(os.path.join(dir2, '*'))
        texts = []
        for text_file in text_files:
            with open(text_file, 'r') as f:
                texts.append(f.read().strip())
        print(f"CLIP-T Score: {clipt_score(texts, image_paths, clip_score)}")

if __name__ == "__main__":
    result_path = "/mnt/workspace/ziwei/SSR_ENcoder/SSR_Encoder/results/KBR_bench/level_one/"
    json_files = glob.glob(os.path.join(result_path, "*.json"))
    for json_file in json_files:
        with open(json_file, "r") as reader:
            data_all = json.load(reader)
            result_images = []
            reference_images = []
            texts = []
            for data in data_all:
                result_images.append(data["result_image"])
                reference_images.append(data["reference_image"])
                texts.append(data["text"])
            clipt_score_value = clipt_score(texts, result_images, CLIPScore())
            clipi_score_value = clipi_score(reference_images, result_images, CLIPScore())
            print("clipt_score:", clipt_score_value)
            print("clipi_score:", clipi_score_value)

Evaluating CLIP-T Score: 100%|██████████| 200/200 [00:09<00:00, 21.90it/s]


AttributeError: 'list' object has no attribute 'read'

In [4]:
pip install megfile

Looking in indexes: https://mirrors.aliyun.com/pypi/simple
Collecting megfile
  Downloading https://mirrors.aliyun.com/pypi/packages/86/09/2c831aea77c8a37c565813087b841d1f63aa776d09e84c27be6654234843/megfile-3.1.0-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m232.2 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting boto3 (from megfile)
  Downloading https://mirrors.aliyun.com/pypi/packages/cd/65/7cf1fd8f8073b1884fe7e559acb3fdd9957ba0cecd46e8bd4b2844b0a48a/boto3-1.34.143-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m232.4 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting botocore>=1.13.0 (from megfile)
  Downloading https://mirrors.aliyun.com/pypi/packages/cf/15/359c52942418b5b9c094909174b9e97dd0364d1fc49fa588cab9361ae0fe/botocore-1.34.143-py3-none-any.whl (12.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12

In [4]:
import spacy
nlp=spacy.load('en_core_web_sm')

doc=nlp("In a city park on a cloudy morning, a white wheaten terrier dog is playfully nudging a white topper hat near the Cloud Gate.")
prompt_without_adj=' '.join([token.text for token in doc if token.pos_ != 'ADJ']) #remove adj
print(prompt_without_adj)

In a city park on a morning , a dog is playfully nudging a topper hat near the Cloud Gate .


Looking in indexes: https://mirrors.aliyun.com/pypi/simple
Processing ./en_core_web_sm-2.3.0.tar.gz
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting spacy<2.4.0,>=2.3.0 (from en_core_web_sm==2.3.0)
  Downloading https://mirrors.aliyun.com/pypi/packages/c8/7c/bb5b8683efd1d36a1d415e426aa95030a7c6f1b2aab492f8403b85560ec4/spacy-2.3.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.9 MB)
[2K     [91m━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/4.9 MB[0m [31m222.7 kB/s[0m eta [36m0:00:12[0m^C
[2K     [91m━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/4.9 MB[0m [31m222.7 kB/s[0m eta [36m0:00:12[0m
[?25h[31mERROR: Operation cancelled by user[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.
