In [120]:
# Imports
import time
import math
import requests
import random
import dateutil.parser as dateparse
import datetime

# Parameters

JWT: Get it from tournesol.app

- open website, open dev tools, get any request to Tournesol api, see Request Headers, get `Authentication="Bearer ..."` value
- DO NOT SHARE THIS TOKEN TO ANYONE. NEVER. IN ANY CONDITIONS. Even support will never need it.
- This token expires after some time of inactivity. If tool fails, try to update the token first.

LNGS: Pick all the languages of the videos to keep

In [121]:
# PARAMETERS
JWT="Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
LNGS=['fr', 'en'] # Consolidation phase will only suggest videos in these languages


# Technical

Below is some technical stuff used for the suggestion mechanism.

Play all the next cells one by one, you do not need to change anything.

In [122]:
# Functions utils
def callTournesol(path: str):
	BASE_URL='https://api.tournesol.app/'
	time.sleep(1)
	response = requests.get(BASE_URL + path, headers={
		'Authorization': JWT
	})
	return response.json()

def callTournesolMulti(path: str, args:str=None):
	URL=path + '?limit=1000' + (('&' + args) if args else '')
	rs = callTournesol(URL)
	if not 'count' in rs or not 'results' in rs:
		print('##### ERROR #####')
		print(URL)
		print(rs)
		exit(-1)
		return []

	total=rs['count']
	allRes = rs['results']
	print(f'{len(allRes)}/{total}', end=' ')

	while len(allRes) < total:
		rs = callTournesol(URL + f'&offset={len(allRes)}')
		total=rs['count']
		allRes += rs['results']
		print(f'{len(allRes)}', end=' ')
	print()

	return allRes

def get(json, default, *fields):
	for f in fields:
		if f in json:
			json = json[f]
			if not json:
				return default
		else:
			return default
	return json

def rndAB():
	return ('A', 'B') if random.random() > 0.5 else ('B', 'A')


def get_individual_score(vdata):
	arr = [s for s in get(vdata, [], 'individual_rating', 'criteria_scores') if s['criteria'] == 'largely_recommended']
	return arr[0]['score'] if arr else None


In [123]:
# Get Already Compared list (COMPARED)
def get_already_compared():
	# First call
	print('Extracting compared videos...', end=' ')
	allRes = callTournesolMulti('users/me/contributor_ratings/videos', 'order_by=-last_compared_at')

	# Exclude videos compared by less than 3 contributors & language not in LNGS
	allRes = [v for v in allRes if get(v, 0, 'collective_rating', 'n_contributors') >= 3 and get(v, LNGS[0], 'entity', 'metadata', 'language') in LNGS ]

	# Exclude videos compared 1 time or less by me
	# Exclude videos compared by less than 3 contributors or more than 10
	lowcomps = [v for v in allRes if
			     get(v, 0, 'collective_rating', 'n_contributors') < 10 
			 and get(v, 0, 'individual_rating', 'n_comparisons') > 1 
			 and get(v, 0, 'individual_rating', 'n_comparisons') < 4
	]

	return (allRes, lowcomps)

In [124]:
# Get Rate Later list (RATE_LATER)
def get_rate_later(exclude):
	# First call
	print('Extracting rate_later list...', end=' ')
	allRes = callTournesolMulti('users/me/rate_later/videos')

	exclude_ids = {get(v, '?', 'entity', 'uid') for v in exclude}
	allRes = [v for v in allRes if get(v, '?', 'entity', 'uid') not in exclude_ids]

	allRes.sort(key=lambda v:(
		# Tournesol score / contributors (0/2=0 if no contributors; score/2 if 1 contrib; score/contributors if >1 contrib), negative to sort from lowest to highest
		# => Will make videos with high number of contributors AFTER videos well noted from few contributors
		- get(v, 0, 'collective_rating', 'tournesol_score') / (get(v, None, 'collective_rating', 'n_contributors') if get(v, 0, 'collective_rating', 'n_contributors') > 1 else 2),
		# Publication date: older first
		get(v, 'unknown', 'entity', 'metadata', 'publication_date'),
	))
	return allRes


In [125]:
# Cache DistComparisonChecker
class DistComparisonChecker:
	def __init__(self):
		self.cache: dict[str,set[str]] = dict()
		self.needrefetch: set[str] = set()
		pass

	def _get_data_cached(self, vid):
		if vid in self.needrefetch or vid not in self.cache:
			print(f'Obtaining comparisons with {vid}...', end=' ')
			allRes = callTournesolMulti(f"users/me/comparisons/videos/{vid}/")
			self.cache[vid] = set()
			for g in allRes:
				vid2 = g['entity_b' if g['entity_a']['uid'] == vid else 'entity_a']['uid']
				self.cache[vid].add(vid2)
				if not vid2 in self.cache:
					self.cache[vid2] = set()
					self.needrefetch.add(vid2)
				self.cache[vid2].add(vid)
			if vid in self.needrefetch:
				self.needrefetch.remove(vid)
		return self.cache[vid]

	def check(self, vdata1, vdata2):
		vid1 = vdata1['entity']['uid']
		vid2 = vdata2['entity']['uid']
		c1 = self._get_data_cached(vid1)
		c2 = self._get_data_cached(vid2)
		return vid1 not in c2 and vid2 not in c1 and c1.isdisjoint(c2)
	
	def checkfast(self, vdata1, vdata2):
		vid1 = vdata1['entity']['uid']
		vid2 = vdata2['entity']['uid']
		c1:set[str] = self.cache.get(vid1,set())
		c2:set[str] = self.cache.get(vid2, set())
		return vid1 not in c2 and vid2 not in c1 and c1.isdisjoint(c2)

	def addAsCompared(self, vdata1, vdata2):
		vid1 = vdata1['entity']['uid']
		vid2 = vdata2['entity']['uid']
		c1 = self._get_data_cached(vid1)
		c2 = self._get_data_cached(vid2)
		c1.add(vid2)
		c2.add(vid1)

		if not vdata1.get('individual_rating', None):
			vdata1['individual_rating'] = dict()
		vdata1['individual_rating']['last_compared_at'] = datetime.datetime.utcnow().isoformat()
		vdata1['individual_rating']['n_comparisons'] = vdata1['individual_rating'].get('n_comparisons',0) + 1
		self.needrefetch.add(vid1)

		if not vdata2.get('individual_rating', None):
			vdata2['individual_rating'] = dict()
		vdata2['individual_rating']['last_compared_at'] = datetime.datetime.utcnow().isoformat()
		vdata2['individual_rating']['n_comparisons'] = vdata2['individual_rating'].get('n_comparisons',0) + 1
		self.needrefetch.add(vid2)

	def clear(self):
		self.cache.clear()

In [126]:
# Phase Consolidate
def phase_consolidate(all_compared:list, DCC: DistComparisonChecker, ignore:set[str]):
	"""
	From all compared videos, take only the ones that has:
		- been compared at least 3 time by me
		- been compared by at least 2 different contributors
		- not been compared in the last 6 days
	From these videos, get pairs such as videos in a pair has:
		- Same language
		- Exact same number of (individual) comparison made
		- No comparison in common (DCC)
	From all pairs, take the one having the smallest score, score being calculated by adding:
		+ Difference of individual score
		+ Difference of collective score
		+ Sqrt(Difference of duration)
		+ Sqrt(Difference of time between both video aired)
	
	Compare them (Do not remove them from the list)

	If none found, return None
	"""
	# Compute minmax & fast access to some data in vdata
	mins = dict()
	maxs = dict()
	vdata = []
	for v in all_compared:
		if v['entity']['uid'] in ignore:
			continue
		lng:str = get(v, None, 'entity', 'metadata', 'language')
		cmp:int = get(v, 0, 'individual_rating', 'n_comparisons')
		cnt:int = get(v, 0, 'collective_rating', 'n_contributors')
		indiv_score:float = get_individual_score(v)
		coll_score:float = get(v, 0, 'collective_rating', 'tournesol_score')
		duration:int = math.sqrt(get(v, 0, 'entity', 'metadata', 'duration'))
		aired = math.sqrt( (datetime.datetime.utcnow() - dateparse.parse(get(v, None, 'entity', 'metadata', 'publication_date'), ignoretz=True)).days )
		last_cmp = (datetime.datetime.utcnow() - dateparse.parse(get(v, None, 'individual_rating', 'last_compared_at'), ignoretz=True)).days
		
		if indiv_score is not None and cnt >= 2 and cmp >= 3 and last_cmp > 6 and lng in LNGS:
			vdata.append({
				'lng': lng,
				'cmp': cmp,
				'ind': indiv_score,
				'col': coll_score,
				'dur': duration,
				'air': aired,
				'full': v
			})
			for (key,val) in (('ind', indiv_score), ('col', coll_score), ('dur', duration), ('air', aired)):
				if not key in mins:
					mins[key] = val
					maxs[key] = val
				elif val < mins[key]:
					mins[key] = val
				elif val > maxs[key]:
					maxs[key] = val

	# Find best pair
	bestpair:tuple[any,any] = None
	while not bestpair:
		bestfitness:float = 999999
		for i1 in range(1,len(vdata)):
			v1 = vdata[i1]

			for v2 in vdata[0:i1]:
				if (v1['lng'] != v2['lng']
					or v1['cmp'] != v2['cmp']
					or (not DCC.checkfast(v1['full'], v2['full']))
				):
					continue

				# Get pair score
				fitness = (
					( (v1['ind'] - v2['ind'])/(maxs['ind']-mins['ind']) )**2 * 2
					+ ( (v1['col'] - v2['col'])/(maxs['col']-mins['col']) )**2 * 1
					+ ( (v1['dur'] - v2['dur'])/(maxs['dur']-mins['dur']) )**2 * 1
					+ ( (v1['air'] - v2['air'])/(maxs['air']-mins['air']) )**2 * 0.5
				)

				if fitness < bestfitness:
					bestfitness = fitness
					bestpair = (v1['full'], v2['full'])

		if not bestpair:
			return None

		if not DCC.check(bestpair[0], bestpair[1]):
			bestpair = None

	DCC.addAsCompared(bestpair[0], bestpair[1])
	ab = rndAB()
	return f'[*] https://tournesol.app/comparison?uid{ab[0]}=' + bestpair[0]['entity']['uid'] + f"&uid{ab[1]}=" + bestpair[1]['entity']['uid']


In [127]:
# Comparisons Generator

def phase_init(compared:list, all_compared:list, rate_later:list, DCC: DistComparisonChecker, ignore: set[str]):
	"""
	Print half comparison URL with first video from RATE_LATER (user will pick the last compared one from last session and compare it with this one)
	Then go to phase 1
	"""
	vid_new = rate_later.pop(0)
	while vid_new['entity']['uid'] in ignore:
		vid_new = rate_later.pop(0)

	vid_old = max([v for v in all_compared if v['entity']['uid'] not in ignore], key=lambda vdata:(
		# Priority to videos compared less than 3 time (under 2 comparisons => 1 || over 2 comparisons => -cmps)
		1 if get(vdata, 0, 'individual_rating', 'n_comparisons') <= 2 else -get(vdata, 0, 'individual_rating', 'n_comparisons'), 
		# Compared the more recently
		get(vdata, '0000-00-00 00:00', 'individual_rating', 'last_compared_at')
	))

	DCC.addAsCompared(vid_new, vid_old)
	# compared.append(vid_new)
	# all_compared.append(vid_new)
	ab = rndAB()
	return (vid_new, f'[+] https://tournesol.app/comparison?uid{ab[0]}=' + vid_new['entity']['uid'] + f"&uid{ab[1]}=" + vid_old['entity']['uid'])


def phase_intricate(vid_new, compared:list, DCC: DistComparisonChecker, ignore: set[str]):
	"""
	Take first from rate later
	Take first from compared

	Check for both: https://api.tournesol.app/users/me/comparisons/videos/yt:<vid>/
	There should be NO vid in common in both lists of entities.

	If ko, take next one from Compared and retry. If no more next: END
	When ok, print comparison URL and pop the one from COMPARED (if still less than 4 cmps, push it to the end of COMPARED), then go Phase 2
	"""
	ok=False
	i=-1
	while not ok:
		i += 1
		if len(compared) <= i:
			return None
		ok = (compared[i]['entity']['uid'] not in ignore) and (DCC.check(vid_new, compared[i]))

	vid_old = compared.pop(i)
	DCC.addAsCompared(vid_new, vid_old)
	# if get(vid_old, 999, 'individual_rating', 'n_comparisons') <= 4:
	# 	compared.append(vid_old)
	ab = rndAB()
	return (vid_new, f'[x] https://tournesol.app/comparison?uid{ab[0]}=' + vid_old['entity']['uid'] + f"&uid{ab[1]}=" + vid_new['entity']['uid'])


def phase_expand(vid_old, all_compared:list, compared:list, rate_later:list, DCC: DistComparisonChecker, ignore: set[str]):
	"""
	Take first 2 in RATE_LATER

	If ko, change the 2nd one with the next and retry. If no more next: END
	When ok, print comparison URL, pop the first one & push it to the end of COMPARED, then go to Phase 1
	"""
	vid_new = rate_later.pop(0)
	while vid_new['entity']['uid'] in ignore:
		vid_new = rate_later.pop(0)

	DCC.addAsCompared(vid_new, vid_old)
	# compared.append(vid_old)
	# all_compared.append(vid_new)
	ab = rndAB()
	return (vid_new, f'[+] https://tournesol.app/comparison?uid{ab[0]}=' + vid_new['entity']['uid'] + f"&uid{ab[1]}=" + vid_old['entity']['uid'])


def getComparisons(all_compared:list, low_compared: list, rate_later: list, DCC: DistComparisonChecker, ignore: set[str]):
	# Copy input lists
	low_compared = list(low_compared)
	rate_later = list(rate_later)

	# Phase 0
	while True:
		(vid, cmp) = phase_init(all_compared, low_compared, rate_later, DCC, ignore)
		yield cmp
		if vid['entity']['uid'] not in ignore:
			break

	while True:
		consolidated = phase_consolidate(all_compared, DCC, ignore)
		if consolidated:
			yield consolidated

		if get(vid, 0, 'individual_rating', 'n_comparisons') >= 2:
			(newvid, cmp) = phase_expand(vid, all_compared, low_compared, rate_later, DCC, ignore)
		else:
			(newvid, cmp) = phase_intricate(vid, low_compared, DCC, ignore)

		if not newvid:
			break
		yield cmp
		if newvid['entity']['uid'] not in ignore:
			vid = newvid

	yield 'NO MORE'

# Main part

There are 2 cells bellow:

- "INIT/RESET": To run once to initialize the tool
- "CONTINUE": To run as many time as you want, to get more comparisons links generated

If at anytime you do comparisons not suggested by the tool, plase run again INIT/RESET to synchronize the tool with your current tournesol account

In [None]:
# INIT/RESET ORDO (Replay this cell everytime any comparison other than suggested by this notebook has been made)
(ALL_COMPARED, LOW_CMPS) = get_already_compared()
RATE_LATER = get_rate_later(ALL_COMPARED)
DCC = DistComparisonChecker()
ignore:set[str] = set()
comparison = getComparisons(ALL_COMPARED, LOW_CMPS, RATE_LATER, DCC, ignore)
comparisons = []
print('Initialized !\n')

In [None]:
# IGNORE: Add here videos to ignore if a suggestion told to compare a video you don't want to compare for now. Then continue ordo (will take in into account)

# ignore.add('yt:abcdefghijk')
print('Ingored videos:', len(ignore))
print(', '.join(sorted(ignore)))

In [None]:
# CONTINUE ORDO (Replay this cell everytime to get next comparison)
# Print previous
for (i,cmp) in enumerate(comparisons[-5:], start=max(1, len(comparisons)-4)):
	print(f'{i:4d}.', cmp)

# Print new
comparisons.append(next(comparison))
print(f'{len(comparisons):4d}.', comparisons[-1])