In [None]:
import os
import numpy as np
import pandas as pd
import requests
import matplotlib.pyplot as plt

import PIL
from PIL import Image
from io import BytesIO

import numpy as np
import torch
from torchvision import transforms

import os
import numpy as np
from pathlib import Path

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch import nn
import torch.nn.functional as F

### Downloading and Processing Data

In [None]:
def load_html_sample(url):
    original_photo=requests.get(url)
    with Image.open(BytesIO(original_photo.content)) as img:
        data_img=np.array(img)
    return data_img

def img_cut(data):
    
    top_left_vertical = 60
    top_left_horizontal = 100
    height = 480
    width = 575

    processed = data[top_left_vertical:top_left_vertical+height, top_left_horizontal:top_left_horizontal+width]
    
    return processed

def gray_url(url):
    img=img_cut(load_html_sample(url))
    img = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
    return img

def gray_scale(img):
    r, g, b = img[:,:,0], img[:,:,1], img[:,:,2]
    gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
    return gray

In [None]:
def concat_csv_in_folder(folder_path):
    csv_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.csv')]
    df_concatenated = pd.concat((pd.read_csv(f) for f in csv_files), ignore_index=True)
    return df_concatenated

## directory
data = concat_csv_in_folder("/Users/jianggh/Desktop/Gravity Spy Project/Gravity Spy Dataset/Data")

data = data[['ml_label','ml_confidence','url1','url2','url3','url4']]


#Change ml_label column into one hot expression.
unique_values = data['ml_label'].unique()

data.dropna()
print(data.info())

data['ml_confidence'] = data['ml_confidence'].astype(float)
print(data['ml_label'].value_counts())

In [None]:
scattered_light_data = data[data['ml_label'] == 'Wandering_Line']

plt.figure(figsize=(10, 6))
plt.hist(scattered_light_data['ml_confidence'], bins=int((scattered_light_data['ml_confidence'].max() - scattered_light_data['ml_confidence'].min()) / 0.01), edgecolor='black')
plt.title('Distribution of ml_confidence for Scattered_Light')
plt.show()

In [None]:
samples = pd.DataFrame()

grouped = data.groupby('ml_label')

for name, group in grouped:

    if len(group)<=250:
        sampled_group = group
        samples = pd.concat([samples, sampled_group])
    else:
        filtered_group = group[group['ml_confidence'] > 0.90]
        samples = pd.concat([samples, filtered_group])

print(f"Total sampled rows: {len(samples)}")

In [None]:
samples = samples.reset_index(drop=False)

In [None]:
samples.to_csv("/Volumes/姜家大备份/GS Testing Dataset/0.samples.csv")

In [None]:
base_path = "/Volumes/姜家大备份/GS Testing Dataset/"

folder_names = [
    "Whistle", "Tomte", "Low_Frequency_Burst", "Fast_Scattering", "Scattered_Light",
    "Low_Frequency_Lines", "Scratchy", "1080Lines", "Blip_Low_Frequency", "Power_Line",
    "Repeating_Blips", "Blip", "Extremely_Loud", "Koi_Fish", "Light_Modulation",
    "Violin_Mode", "Helix", "No_Glitch", "1400Ripples", "Chirp", "Wandering_Line",
    "Air_Compressor", "Paired_Doves", "None_of_the_Above"
]

for folder_name in folder_names:
    folder_path = os.path.join(base_path, folder_name)
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

In [None]:
def gray_img(arr):
    img=img_cut(arr)
    img = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
    img = np.round(img).astype(np.uint8)
    return img

error_list=[]
for index, row in samples[:].iterrows():
    print(index)
    ml_label = row['ml_label']
    base_dir = f"/Volumes/姜家大备份/GS Testing Data/{ml_label}"
    index_dir = f"{base_dir}/{index}"
    
    if not os.path.exists(base_dir):
        os.makedirs(base_dir)
    
    if not os.path.exists(index_dir):
        os.makedirs(index_dir)
    
    for i, url in enumerate([row['url1'], row['url2'], row['url3'], row['url4']], start=1):
        try:
            img = requests.get(url)
            img.raise_for_status()
            with Image.open(BytesIO(img.content)) as img:
                img=np.array(img)
            img = gray_img(img)
            np.save(f"{index_dir}/url{i}.npy", img)

        except requests.exceptions.RequestException as e:
            print(f"Request Error at Index: {index} URL: url{i}")
            error_list.append((index,i))
        except requests.exceptions.HTTPError as http_err:
            print(f"HTTP error occurred at Index: {index} URL: url{i}")
            error_list.append((index,i))
        except Exception as e:
            print(f"Error processing image at Index: {index} URL: url{i}")
            error_list.append((index,i))            

### Retrying failed images, else removing the sample

In [None]:
second_error_list = []
for (i, j) in error_list:
    print(i,j)
    ml_label = samples.loc[i,'ml_label']
    base_dir = f"/Volumes/姜家大备份/GS Testing Data/{ml_label}"
    index_dir = f"{base_dir}/{i}"
    
    file_path = f"{index_dir}/url{j}.npy"
    if os.path.exists(file_path):
        print(f"File already exists")
        continue
    
    if not os.path.exists(base_dir):
        os.makedirs(base_dir)
    
    if not os.path.exists(index_dir):
        os.makedirs(index_dir)
    
    url = samples.loc[i,f'url{j}']
    try:
        img = requests.get(url)
        img.raise_for_status()
        with Image.open(BytesIO(img.content)) as img:
            img=np.array(img)
        img = gray_img(img)
        np.save(f"{index_dir}/url{j}.npy", img)

    except requests.exceptions.RequestException as e:
        print(f"Request Error at Index: {i} URL: url{j}")
        second_error_list.append((i,j))
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred at Index: {i} URL: url{j}")
        second_error_list.append((i,j))
    except Exception as e:
        print(f"Error processing image at Index: {i} URL: url{j}")
        second_error_list.append((i,j))


In [None]:
import shutil

deletion_error_list = []
for i, j in second_error_list:
    ml_label = samples.loc[i, 'ml_label']
    base_dir = f"/Volumes/姜家大备份/GS Testing Data/{ml_label}"
    index_dir = f"{base_dir}/{i}"
    
    if os.path.exists(index_dir):
        try:
            shutil.rmtree(index_dir)
            print(f"Deleted directory: {index_dir}")
        except Exception as e:
            deletion_error_list.append((i,j))
            print(f"Error deleting directory {index_dir}: {e}")
    else:
        print(f"Directory does not exist: {index_dir}")

### Checking if all data is proper

In [None]:
def process_csv_files(Class_Folder):
    for Class_Root, Index_Level_Folders, files in os.walk(Class_Folder):
        for index in Index_Level_Folders:
            print(index)
            Index_Folder = os.path.join(Class_Root,index)
            if int(index) >= 6000:
                continue
            else:
                for Index_Root, folders, NPY_Files in os.walk(Index_Folder): 
                    for file in NPY_Files:
                        file_path = os.path.join(Index_Root, file)
                        arr = np.load(file_path)
                        arr = np.round(arr).astype(np.uint8)
                        np.save(file_path, arr)

def traverse_and_process(GS_Selected_Data):
    for GS_Selected_Data_Root, Class_Level_Folders, files in os.walk(GS_Selected_Data):
        for Class_Folder in Class_Level_Folders:
            print(Class_Folder)
            
            dir_path = os.path.join(GS_Selected_Data_Root, Class_Folder)

            process_csv_files(dir_path)

base_dir = '/Volumes/姜家大备份/GS Selected Data'
traverse_and_process(base_dir)

In [None]:
#检查文件是否是有且仅有四个url，并且命名正确
base_dir = '/Volumes/姜家大备份/GS Selected Data'
problem_indices = []

for dataroot, class_folders, _ in os.walk(base_dir):  # GS Selected Data 里面的所有文件夹
    for class_ in class_folders:  # 遍历所有类别文件夹
        class_dir = os.path.join(dataroot, class_)
        for index in os.listdir(class_dir):  # 特定类别里面的所有文件夹（数据索引）
            item_dir = os.path.join(class_dir, index)
            if os.path.isdir(item_dir):  # 确保是目录
                urls = os.listdir(item_dir)
                expected_files = {"url1.npy", "url2.npy", "url3.npy", "url4.npy"}
                if len(urls) != 4 or not expected_files.issubset(set(urls)):
                    problem_indices.append(index)
                    print(index)

problem_indices = [int(item) for item in problem_indices]
print(problem_indices)

In [None]:
#先删除所有problem_indices，之后在上面重新下载一遍problem_indices
import shutil

deletion_error_list = []
for i in problem_indices:
    ml_label = selected_data.loc[i, 'ml_label']
    base_dir = f"/Volumes/姜家大备份/GS Selected Data/{ml_label}"
    index_dir = f"{base_dir}/{i}"
    
    if os.path.exists(index_dir):
        try:
            shutil.rmtree(index_dir)
            print(f"Deleted directory: {index_dir}")
        except Exception as e:
            deletion_error_list.append((i,j))
            print(f"Error deleting directory {index_dir}: {e}")
    else:
        print(f"Directory does not exist: {index_dir}")