Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed automatic tags being used and tidied up code #13

Merged
merged 3 commits into from May 31, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 57 additions & 46 deletions cptv-download.py
Expand Up @@ -114,70 +114,65 @@ def _downloader(self, q, api, out_base):
finally:
q.task_done()

def _get_out_dir(self, filebase_name, r):
out_dir = ""
def _get_manual_tags(self, r):
if self.recording_tags:
out_dir = get_tag_directory(r["Tags"])
tags = get_manual_recording_tags(r)
else:
out_dir = get_distributed_folder(filebase_name)
return out_dir
tags = get_manual_track_tags(r)
return tags

def _get_distinct_tags(self, r):
tags = []
def _get_tags_descriptor_and_out_dir(self, r, filebase_name):
tags = self._get_manual_tags(r)
description = get_tags_descriptor(tags)
if self.recording_tags:
tags = [tag["animal"] for tag in r["Tags"]]
out_dir = description
else:
for track in r["tracks"]:
if track.get("data"):
tags.append(track["data"].get("tag"))
return tags
out_dir = get_distributed_folder(filebase_name)

return description, out_dir

def _download(self, r, api, out_base):
dt = parse(r["recordingDateTime"])
file_base = dt.strftime("%Y%m%d-%H%M%S") + "-" + r["Device"]["devicename"]

out_dir = self._get_out_dir(file_base, r)
r["Tracks"] = api.get_tracks(r["id"]).get("tracks")

tags_desc, out_dir = self._get_tags_descriptor_and_out_dir(r, file_base)
if out_dir is None:
print('No valid out directory for file "%s"' % file_base)
return
out_dir = os.path.join(out_base, out_dir)

tracks = api.get_tracks(r["id"]).get("tracks")
r["tracks"] = tracks

tags = list(self._get_distinct_tags(r))
tag = tags[0] if len(tags) == 1 else None
if self.recording_tags:
tag = out_dir
out_dir = out_base / out_dir

if tag and tag in self.ignore_tags:
print('Ignored file "%s" - tag "%s" ignored' % (file_base, tag))
if tags_desc in self.ignore_tags:
print('Ignored file "%s" - tag "%s" ignored' % (file_base, tags_desc))
self.ignored += 1
return

if tag and self.only_tags and tag not in self.only_tags:
print('Ignored file "%s" - tag "%s" is not selected' % (file_base, tag))
if self.only_tags and tags_desc not in self.only_tags:
print(f'Ignored file "{file_base}" - tag "{tags_desc}" is not selected')
self.not_selected += 1
return

fullpath = os.path.join(out_dir, file_base)
fullpath = out_dir / file_base
if self.auto_delete:
self._delete_existing(file_base, out_dir)

os.makedirs(out_dir, exist_ok=True)
print("Processing ", file_base)

if iter_to_file(fullpath + ".cptv", api.download_raw(r["id"])):
if iter_to_file(fullpath.with_suffix(".cptv"), api.download_raw(r["id"])):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

print(format_row(r) + ".cptv" + " [{}]".format(out_dir))

if self.include_mp4:
if iter_to_file(fullpath + ".mp4", api.download(r["id"])):
if iter_to_file(fullpath.with_suffix(".mp4"), api.download(r["id"])):
print(format_row(r) + ".mp4" + " [{}]".format(out_dir))

if self.include_metadata:
meta_file = fullpath.with_suffix(".txt")
r["additionalMetadata"] = ""
if not os.path.exists(fullpath + ".txt"):
json.dump(r, open(fullpath + ".txt", "w"), indent=4)
if not os.path.exists(meta_file):
json.dump(r, open(meta_file, "w"), indent=4)

def _delete_existing(self, file_base, new_dir):
for path in self.file_list.get(file_base + ".cptv", []):
Expand Down Expand Up @@ -212,30 +207,46 @@ def get_distributed_folder(name, num_folders=256, seed=31):
return "{:02x}".format(hash_code % num_folders)


def get_tag_directory(tags):
"""Determine the directory store videos in based on tags. """
if tags is None:
return "untagged"

# get a unique list of tags.
# note, tags can have event set to 'false positive' in which case we use this as the 'animal' type.
clip_tags = set()

def get_manual_recording_tags(r):
""" Gets all distinct recording based tags """
manual_tags = set()
tags = r["Tags"]
for tag in tags:
if tag["automatic"]:
continue
tag_name = (
tag["animal"] if tag["event"] != "false positive" else "false-positive"
)
clip_tags.add(tag_name)
event = tag["event"]
if event == "false positive":
tag_name = "false-positive"
else:
tag_name = tag["animal"]
manual_tags.add(tag_name)

return manual_tags


if len(clip_tags) >= 2:
def get_manual_track_tags(r):
""" Gets all distinct track based tags """
manual_tags = set()

for track in r["Tracks"]:
track_tags = track.get("TrackTags", [])
for track_tag in track_tags:
if not track_tag["automatic"]:
manual_tags.add(track_tag["what"])
return manual_tags


def get_tags_descriptor(manual_tags):
""" Returns a string describing all tags """
if not manual_tags:
return "untagged"
if len(manual_tags) >= 2:
return "multi"

if not clip_tags:
if not manual_tags:
return "untagged-by-humans"

tag = list(clip_tags)[0]
tag = list(manual_tags)[0]
tag = tag.replace("/", "-") if tag else None
return tag

Expand Down