In [None]:
%%capture
!pip install -U wandb
!pip install -U pandas # upgrade pandas
!pip install -U pandarallel
# !pip install -U dask["complete"]
# !pip install swifter # first time installation
# !pip install swifter[groupby]
# !pip install wrapt_timeout_decorator

In [None]:
import wandb
wandb.login()

In [None]:
project_name = "krea-open-prompts"
run_name = "download-open-prompts-sd-images"
tags = ["download", "stable_diffusion", "images"]
_config = {
    # option of "prompts" or "sample_prompts"
    "dataset": "prompts"
}

In [None]:
run = wandb.init(project=project_name, name=run_name, tags=tags, config=_config)

In [None]:
config = run.config
dataset_name = config['dataset']

In [None]:
art = run.use_artifact('open-prompts-sd:latest', type='raw_data')
dataset_path = art.get_path(f"{dataset_name}.csv").download()

In [None]:
# import ray
# ray.init(ignore_reinit_error=True, dashboard_host="0.0.0.0", include_dashboard=True)

In [None]:
import psutil
# By default, Pandarallel use all available CPUs
NB_PHYSICAL_CORES = psutil.cpu_count(logical=False)
NB_CORES = psutil.cpu_count()

In [None]:
NB_CORES

In [None]:
from pandarallel import pandarallel
#LEts see if using logical cores too will be good
pandarallel.initialize(progress_bar=True, nb_workers=NB_CORES)

In [None]:
import pandas as pd
# import swifter
# import modin.pandas as mpd
# from modin.config import ProgressBar
# ProgressBar.enable()

In [None]:
import json

In [None]:
from pandas import json_normalize

In [None]:
def load_and_flatten_json(record):
    json_record = json.loads(record)
    flattened_json_record = json_normalize(json_record, sep="_")
    return flattened_json_record.to_dict(orient="records")[0]

After trial and error:
If you want high perofrmance applies and processing of dataframes in an easy manner
Start with pandarallel (https://nalepae.github.io/pandarallel/user_guide/). 
pandarallel gets around this limitation by using all cores of your computer. But, in return, pandarallel need twice the memory that standard pandas operation would normally use.

==> pandarallel should NOT be used if your data cannot fit into memory with pandas itself
Then try swifter. It will try to automagically figure out if your function is vectorized, dask apply ready, and then defaults to pandas
Hard to get the function just right because i am dumb. force_parallel is nice. Felt faster but not the speeds I wanted to see like with throwing all cores at the problem
If still nothing then use raw modin or dask
You really need to make sure your function will apply really well on the partitions and may need to worry about data transfer but honestly not that bad. You need to load the dataframe differently than pandas so kind of not as automagic
Anything more and you probs need spark which i did not try becuase i have 0 desire for a quick and dirty clean

In [None]:
import gc

In [None]:
from PIL import Image
import requests

In [None]:
from pathlib import Path
import os

In [None]:
# dataset_name = "test_modin"

In [None]:
image_folder = Path(".", dataset_name)
image_folder.mkdir(parents=True, exist_ok=True)

In [None]:
default_timeout = 1
long_sleep = 5

In [None]:
import time
from requests.exceptions import Timeout

In [None]:
def generate_local_file_path(image_url):
    file_name = image_url.rsplit('/', 1)[-1]
    return file_name

In [None]:
def download_image_files(row):
    image_url = row["raw_discord_data_image_uri"]
    file_path = Path(image_folder, row["local_image_location"])
    try:
        #Allows for retries without redownloads
        if file_path.exists():
            #Test the image actually opens and then close it
            img = Image.open(file_path)
            img.close()
            return True
        #Need to sleep so we dont explode discord and get banned lmao
        # could actually add logic in the try catch to sleep or somehting
        # time.sleep(0.2)
        #Downloads image and writes it to file
        img_data = requests.get(image_url, timeout=default_timeout).content
        with open(file_path, 'wb') as handler:
            handler.write(img_data)
        #Test the image actually opens and then close it
        img = Image.open(file_path)
        img.close()
        return True
    except Timeout:
        print("sleeping zzz")
        time.sleep(long_sleep)
        return False
    except Exception as e:
        print(e)
        #Remove traces of erred files to prevent broken files from still existing
        if file_path.exists():
            os.remove(file_path)
        return False

In [None]:
# from tqdm import tqdm
# tqdm.pandas()

In [None]:
#BUG: check if modin actually helps here
# df = mpd.DataFrame(df)

In [None]:
import numpy as np

In [None]:
import os
#TODO: store images in a bucket and read/write to there and make the artifact reference that. Will allow for multiple pcs and processes to write to it
#also not restricted to local filestore
downloaded_images = set(os.listdir(image_folder))

In [None]:
#TODO: Add distributed machine setup with prechunked dataframes where the flag is which chunk to process. Write to S3 and read from S3. Spin up a bunch of machines andrun this workflow.

In [None]:
len(downloaded_images)

In [None]:
%%time
chunksize = 50000
responses = []
for raw_df in pd.read_csv(dataset_path, chunksize=chunksize):
    df = pd.DataFrame(raw_df["raw_data"].parallel_apply(load_and_flatten_json).to_list())
    # df = pd.DataFrame(raw_df["raw_data"].swifter.force_parallel().apply(load_and_flatten_json).to_list())

    df["local_image_location"] = df["raw_discord_data_image_uri"].str.rsplit("/", 1).str[-1]
    df["prompt"] = raw_df["prompt"]
    del raw_df
    gc.collect()
    # for chunk in np.array_split(df, 10):
        # responses = chunk["raw_discord_data_image_uri"].parallel_apply(download_image_files)
    df_to_download = df[~df["local_image_location"].isin(downloaded_images)][["raw_discord_data_image_uri", "local_image_location"]]
    if df_to_download.shape[0] == 0:
        continue
    try:
        response = df_to_download[["raw_discord_data_image_uri", "local_image_location"]].parallel_apply(download_image_files, axis=1)
        responses.append(response)
    except:
        continue
    # responses = df["raw_discord_data_image_uri"].swifter.force_parallel().apply(download_image_files)

In [None]:
run.finish()