/
fetcher.py
151 lines (123 loc) · 5.67 KB
/
fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from kaggle_data.downloader import KaggleDataDownloader
class DatasetFetcher:
def __init__(self):
"""
A tool used to automatically download, check, split and get
relevant information on the dataset
"""
self.train_data = None
self.test_data = None
self.train_masks_data = None
self.train_files = None
self.test_files = None
self.train_masks_files = None
def download_dataset(self, hq_files=True):
"""
Downloads the dataset and return the input paths
Args:
hq_files (bool): Whether to download the hq files or not
Returns:
list: [train_data, test_data, metadata_csv, train_masks_csv, train_masks_data]
"""
competition_name = "carvana-image-masking-challenge"
script_dir = os.path.dirname(os.path.abspath(__file__))
destination_path = os.path.join(script_dir, '../../input/')
prefix = ""
if hq_files:
prefix = "_hq"
files = ["train" + prefix + ".zip", "test" + prefix + ".zip", "metadata.csv.zip",
"train_masks.csv.zip", "train_masks.zip"]
datasets_path = [destination_path + "train" + prefix, destination_path + "test" + prefix,
destination_path + "metadata.csv", destination_path + "train_masks.csv",
destination_path + "train_masks"]
is_datasets_present = True
# If the folders already exists then the files may already be extracted
# This is a bit hacky but it's sufficient for our needs
for dir_path in datasets_path:
if not os.path.exists(dir_path):
is_datasets_present = False
if not is_datasets_present:
# Put your Kaggle user name and password in a $KAGGLE_USER and $KAGGLE_PASSWD env vars respectively
downloader = KaggleDataDownloader(os.getenv("KAGGLE_USER"), os.getenv("KAGGLE_PASSWD"), competition_name)
for file in files:
output_path = downloader.download_dataset(file, destination_path)
downloader.decompress(output_path, destination_path)
os.remove(output_path)
else:
print("All datasets are present.")
self.train_data = datasets_path[0]
self.test_data = datasets_path[1]
self.train_masks_data = datasets_path[4]
self.train_files = sorted(os.listdir(self.train_data))
self.test_files = sorted(os.listdir(self.test_data))
self.train_masks_files = sorted(os.listdir(self.train_masks_data))
return datasets_path
def get_car_image_files(self, car_image_id, test_file=False, get_mask=False):
if get_mask:
if car_image_id + "_mask.gif" in self.train_masks_files:
return self.train_masks_data + "/" + car_image_id + "_mask.gif"
elif car_image_id + ".png" in self.train_masks_files:
return self.train_masks_data + "/" + car_image_id + ".png"
else:
raise Exception("No mask with this ID found")
elif test_file:
if car_image_id + ".jpg" in self.test_files:
return self.test_data + "/" + car_image_id + ".jpg"
else:
if car_image_id + ".jpg" in self.train_files:
return self.train_data + "/" + car_image_id + ".jpg"
raise Exception("No image with this ID found")
def get_image_matrix(self, image_path):
img = Image.open(image_path)
return np.asarray(img, dtype=np.uint8)
def get_image_size(self, image):
img = Image.open(image)
return img.size
def get_train_files(self, validation_size=0.2, sample_size=None):
"""
Args:
validation_size (float):
Value between 0 and 1
sample_size (float, None):
Value between 0 and 1 or None.
Whether you want to have a sample of your dataset.
Returns:
list :
Returns the dataset in the form:
[train_data, train_masks_data, valid_data, valid_masks_data]
"""
train_ids = list(map(lambda img: img.split(".")[0], self.train_files))
# Each id has 16 images but well...
if sample_size:
rnd = np.random.choice(train_ids, int(len(train_ids) * sample_size))
train_ids = rnd.ravel()
if validation_size:
ids_train_split, ids_valid_split = train_test_split(train_ids, test_size=validation_size)
else:
ids_train_split = train_ids
ids_valid_split = []
train_ret = []
train_masks_ret = []
valid_ret = []
valid_masks_ret = []
for id in ids_train_split:
train_ret.append(self.get_car_image_files(id))
train_masks_ret.append(self.get_car_image_files(id, get_mask=True))
for id in ids_valid_split:
valid_ret.append(self.get_car_image_files(id))
valid_masks_ret.append(self.get_car_image_files(id, get_mask=True))
return [np.array(train_ret).ravel(), np.array(train_masks_ret).ravel(),
np.array(valid_ret).ravel(), np.array(valid_masks_ret).ravel()]
def get_test_files(self, sample_size):
test_files = self.test_files
if sample_size:
rnd = np.random.choice(self.test_files, int(len(self.test_files) * sample_size))
test_files = rnd.ravel()
ret = [None] * len(test_files)
for i, file in enumerate(test_files):
ret[i] = self.test_data + "/" + file
return np.array(ret)