In [None]:
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from datetime import datetime
from PIL import Image
import requests
from io import BytesIO
import aiohttp
import asyncio
import nest_asyncio
nest_asyncio.apply()
from time import time

In [None]:
csv_pth = Path(r"C:\Users\au761367\Datasets\classif\camalien\camalien_images_and_responsefiles_2024.csv")

In [None]:
df=pd.read_csv(csv_pth, delimiter=';')

In [None]:
df.head()

In [None]:
df[['country','timestamp']].head()

In [None]:
df.groupby('country').agg('count').path

In [None]:
len(df)

In [None]:
df['timestamp'] = pd.to_datetime(df['timestamp'], format="ISO8601")

In [None]:
countries = df['country'].unique()

In [None]:

def plot_dist(df):
    df=df.copy()
    for country in countries:
        subset = df[df['country'] == country]
    
        plt.figure(figsize=(10, 4))
        plt.hist(subset['timestamp'], bins=30)   # adjust bins as needed
        plt.title(f"Timestamp Histogram â€“ {country}")
        plt.xlabel("Timestamp")
        plt.ylabel("Count")
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
        break
plot_dist(sampled)

In [None]:
df['timestamp'].head()

In [None]:
date = "2024-06-11"

start = pd.to_datetime(date, utc=True).normalize()
end   = start + pd.Timedelta(days=1)

filtered = df[(df['timestamp'] >= start) & (df['timestamp'] < end)]

In [None]:
unique_days = df['timestamp'].dt.date.unique()
print(sorted(unique_days))

In [None]:
df['day'] = df['timestamp'].dt.normalize()

In [None]:
df = df.sort_values(['country', 'day', 'timestamp'])

In [None]:
df['imageurl'].tail().iloc[4]

In [None]:
df.tail(30)

In [None]:
N = 15

sampled = (
    df
    .groupby(['country', 'day'])
    .apply(lambda g: g.iloc[::N])   # take every N-th row
    .reset_index(drop=True)
)

In [None]:
len(sampled)

In [None]:
sampled.head()

In [None]:
async def fetch_image(session, url, semaphore):
    async with semaphore:
        try:
            async with session.get(url) as resp:
                resp.raise_for_status()
                data = await resp.read()
                return Image.open(BytesIO(data))
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None

async def fetch_many_images(urls, max_concurrency=16, timeout_seconds=10):
    timeout = aiohttp.ClientTimeout(total=timeout_seconds)
    connector = aiohttp.TCPConnector(limit=max_concurrency)

    semaphore = asyncio.Semaphore(max_concurrency)

    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        tasks = [
            fetch_image(session, url, semaphore)
            for url in urls
        ]
        return await asyncio.gather(*tasks)


def plot_sample_grid(df, start_idx=0, grid_size=4, url_col="imageurl"):
    """
    Plot a 4x4 grid (16 images) from df starting at index start_idx.
    Downloads using the `imageurl` column.
    """

    n_images = grid_size * grid_size
    urls = df[url_col].iloc[start_idx : start_idx + n_images]

    # run async loader
    t1=time()
    images = asyncio.run(fetch_many_images(urls, max_concurrency=16))
    t2=time()

    fig, axes = plt.subplots(grid_size, grid_size, figsize=(16, 16))
    axes = axes.flatten()

    for ax, img in zip(axes, images):
        if img is None:
            ax.text(0.5, 0.5, "Load error", ha="center", va="center")
            ax.axis("off")
            continue

        ax.imshow(img)
        ax.axis("off")

    plt.tight_layout()
    plt.show()
    t3=time()

In [None]:
plot_sample_grid(sampled)

In [None]:
df['imageurl'].head(11).iloc[10]

In [None]:
len(sampled)

In [None]:
async def download_and_save_image(session, url, save_path, semaphore):
    async with semaphore:
        try:
            async with session.get(url) as resp:
                resp.raise_for_status()
                data = await resp.read()
                with open(save_path, "wb") as f:
                    f.write(data)
        except Exception as e:
            print(f"Failed to download image {url}: {e}")

async def download_and_save_json(session, url, save_path, semaphore):
    async with semaphore:
        try:
            async with session.get(url) as resp:
                resp.raise_for_status()
                data = await resp.text()
                with open(save_path, "w", encoding="utf-8") as f:
                    f.write(data)
        except Exception as e:
            print(f"Failed to download JSON {url}: {e}")

async def save_images_and_json(df, img_dir:Path, json_dir:Path, max_concurrency:int=16):
    semaphore = asyncio.Semaphore(max_concurrency)
    timeout = aiohttp.ClientTimeout(total=60)
    connector = aiohttp.TCPConnector(limit=max_concurrency)

    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        tasks = []

        for _, row in df.iterrows():
            # Image
            image_filename = f"{row['imagedataid']}.jpg"
            image_path = img_dir / image_filename
            tasks.append(download_and_save_image(session, row['imageurl'], image_path, semaphore))

            # JSON
            json_filename = f"{row['imagedataid']}.json"
            json_path = json_dir / json_filename
            tasks.append(download_and_save_json(session, row['pn_response'], json_path, semaphore))

        await asyncio.gather(*tasks)

In [None]:
out_dir = Path(r"D:")
img_dir = out_dir / "images"
json_dir = out_dir / "json"
img_dir.mkdir(exist_ok=True)
json_dir.mkdir(exist_ok=True)

In [None]:
asyncio.run(save_images_and_json(sampled, img_dir, json_dir, max_concurrency=16))