From 7f1b60619a52bb7b3d17f8a695075a423baa509f Mon Sep 17 00:00:00 2001 From: "Esteban J. G. Gabancho" Date: Tue, 24 Apr 2018 10:11:40 -0400 Subject: [PATCH] [ci skip] Refactoring for Invenio-Videos --- cds_sorenson/api.py | 580 ++++++++++++++++++++++--------------- cds_sorenson/config.py | 6 +- cds_sorenson/error.py | 21 +- cds_sorenson/legacy_api.py | 263 +++++++++++++++++ cds_sorenson/utils.py | 121 ++++++-- 5 files changed, 739 insertions(+), 252 deletions(-) create mode 100644 cds_sorenson/legacy_api.py diff --git a/cds_sorenson/api.py b/cds_sorenson/api.py index 7867b6f..0868b70 100644 --- a/cds_sorenson/api.py +++ b/cds_sorenson/api.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of CERN Document Server. -# Copyright (C) 2016, 2017 CERN. +# Copyright (C) 2018 CERN. # # Invenio is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public License as @@ -21,239 +21,355 @@ # In applying this license, CERN does not # waive the privileges and immunities granted to it by virtue of its status # as an Intergovernmental Organization or submit itself to any jurisdiction. +"""Invenio-Videos API implementation for use Sorenson and FFMPG.""" -"""API to use Sorenson transcoding server.""" +import logging +import os.path +import tempfile +import time +from random import randint -from __future__ import absolute_import, print_function +from flask import current_app +from invenio_db import db +from invenio_files_rest.models import FileInstance, ObjectVersion, \ + ObjectVersionTag, as_object_version + +from .legacy_api import can_be_transcoded, get_all_distinct_qualities, \ + get_encoding_status, restart_encoding, start_encoding, stop_encoding +from .utils import eos_fuse_fail_safe, filepath_for_samba, \ + generate_json_for_encoding, get_status + + +class VideoMixing(object): + """Mixing for video processing.""" + + # TODO add needed properties, i.e. duration + + def __init__(self, video_object_version, *args, **kwargs): + """.""" + self.video = as_object_version(video_object_version) + self._extracted_metadta = kwargs.get('extracted_metadta', {}) + + @property + def presets(self): + """Get the details of all the presets that are available for the video. + + :return: Dictionary with the details of each of the reset grouped by + preset name. The content of dictionary depends on configuration of each + video class content, your consumer app should be aware of that.. + """ + raise NotImplemented() + + @property + def extracted_metadata(self): + """Get video metadata. + + If the metadata is not cached in the object if will call + `extract_metadata`. + """ + if not self._extracted_metadta: + self._extracted_metadta = self.extract_metadata() + return self._extracted_metadta + + def extract_metadata(self, + process_output_callback=None, + attach_to_video=False, + *args, + **kwargs): + """Extract all the video metadata from the output of ffprobe. + + :param process_output_callback: function to process the ffprobe output, + takes a dictionary and returns a dictionary. + :param attach_to_video: If set to True the extracted metadata will be + attached to the video `ObjectVersion` as `ObjectVersionTag` after + running the `process_output_callback`. The `ObjectVersionTag` storage + is key value base, where the value can be stringify into the database, + keep this in mind when setting this value to true. + :return: Dictionary with the extracted metadata. + """ + raise NotImplemented() + + def create_thumbnails(self, + start=5, + end=95, + step=10, + progress_callback=None, + *args, + **kwargs): + """Create thumbnail files. + + :param start: percentage to start extracting frames. Default 5. + :param end: percentage to finish extracting frames. Default 95. + :param step: percentage between frames. Default 10. + :param progress_callback: function to report progress, takes an integer + showing percentage of progress, a string with a message and a + dictionary with more information. + :return: List of `ObjectVersion` with the thumbnails. + """ + raise NotImplemented() + + def create_gif_from_frames(self): + """Create a gif file with the extracted frames. + + `create_thumbnail` needs to be run first, if there are not frames it + will raise. + """ + raise NotImplemented() + + def encode(self, desired_quality, callback_function=None, *args, **kwargs): + """Encode the video using a preset. + + :param desired_quality: Name of the quality to encode the video with. + If the preset does not apply to the current video raises. + :param callback_function: function to report progress, takes and + integer showing the percentage of the progress, a string with a message + and a dictionary with mote information. + :return: Dictionary containing the information of the encoding job. + """ + raise NotImplemented() + + @staticmethod + def stop_encoding_job(job_id, *ars, **kwargs): + """Stop encoding. + + :param job_id: ID of the job to stop. + """ + raise NotImplemented() + + @staticmethod + def get_job_status(job_id): + """Get status of a give job. + + :param job_id: ID of the job to track the status. + :return: Tuple with an integer showing the percentage of + the process, a string with a message and a dictionary with more + information (perhaps `None`) + """ + raise NotImplemented() + + +class SorensonStatus(Enum): + """Sorenson status mapping.""" + + PENDING = 0 + """Not yet running.""" + + STARTED = 1 + """Running.""" + + SUCCESS = 2 + """Done.""" + + FAILURE = 3 + """Error.""" + + REVOKED = 4 + """Canceled.""" + + +class CDSSorenson(VideoMixing): + """Soreson Video implementation.""" + + @property + def _sorenson_queue(self): + """Given the size of the file decide which queue to use.""" + # TODO + return current_app.config['CDS_SORENSON_DEFAULT_QUEUE'] + + @eos_fuse_fail_safe + def extract_metadata(self, + process_output_callback=None, + attach_to_video=True, + *args, + **kwargs): + """Use FFMPG to extract all video metadata.""" + cmd = ('ffprobe -v quiet -show_format -print_format json ' + '-show_streams -select_streams v:0 {input_file}'.format(kwargs)) + self._extracted_metadata = run_ffmpeg_command(cmd).decode('utf-8') + if not self._extracted_metadata: + # TODO: perhaps we want to try a different command, i.e. avi files + raise RuntimeError('No metadata extracted for {0}'.format( + self.video)) + + if process_output_callba: + self._extracted_metadata = process_output_callback( + self._extracted_metadata) + + if attach_to_video: + for key, value in self._extracted_metadata.iteritems(): + ObjectVersionTag.create_or_update(self.video, key, value) + + return self._extracted_metadata + + @eos_fuse_fail_safe + def create_thumbnails(self, + start=5, + end=95, + step=10, + progress_callback=None, + *args, + **kwargs): + """Use FFMPEG to create thumbnail files.""" + duration = float(self.duration) + step_time = duration * step / 100 + start_time = duration * start / 100 + end_time = (duration * end / 100) + 0.01 # FIXME WDF? + + number_of_thumbnails = ((end - start) / step) + 1 + + assert all([ + 0 < start_time < duration, + 0 < end_time < duration, + 0 < step_time < duration, + start_time < end_time, + (end_time - start_time) % step_time < 0.05 # FIXME WDF? + ]) + + thumbnail_name = current_app.config.get( + 'VIDEO_THUMBNAIL_NAME_TEMPLATE', 'thumbnail-{0:d}.jpg') + # Iterate over requested timestamps + objs = [] + for i, timestamp in enumerate(range(start_time, end_time, step_time)): + with tempfile.TemporaryDirectory() as output: + output_file = os.path.join(output, thumbnail_name) + # Construct ffmpeg command + # input_file will be interpolated later, hence {{input_file}} + cmd = ( + 'ffmpeg -accurate_seek -ss {timestamp} -i {{input_file}}' + ' -vframes 1 {output_file} -qscale:v 1').format( + timestamp=timestamp, output_file=output_file, **kwargs) + + # Run ffmpeg command + run_ffmpeg_command(cmd) + + # Create ObjectVersion out of the tmp file + with db.session.being_nested(), open(output_file) as f: + obj = ObjectVersion.create( + bucket=self.video.bucket, + key=thumbnail_name, + stream=f, + size=os.path.getsize(output_file)) + ObjectVersionTag.create(obj, 'master', str( + video.version_id)) + ObjectVersionTag.create(obj, 'media_type', 'image') + ObjectVersionTag.create(obj, 'context_type', 'thumbnail') + ObjectVersionTag.create(obj, 'content_type', 'jpg') + ObjectVersionTag.create(obj, 'timestamp', + start_time + (i + 1) * step_time) + obj.appen(obj) + + # Report progress + if progress_callback: + progress_callback(number_of_thumbnails / i + 1) + + return objs + + def encode(self, desired_quality, callback_function=None, *args, **kwargs): + """Enconde a video.""" + preset_info = self.presets.get(desired_quality) + assert preset_info, 'Unknown preset' + + input_file = filepath_for_samba(self.video) + + with db.session.begin_nested(): + # Create FileInstance + file_instance = FileInstance.create() + + # Create ObjectVersion + obj_key = self._build_subformat_key(preset_info) + obj = ObjectVersion.create(bucket=self.video.bucket, key=obj_key) + + # Extract new location + bucket_location = self.video.bucket.location + storage = file_instance.storage(default_location=bucket_location) + directory, filename = storage._get_fs() + + # XRootDPyFS doesn't implement root_path + try: + # XRootD Safe + output_file = os.path.join( + directory.root_url, directory.base_path, filename) + except AttributeError: + output_file = os.path.join(directory.root_path, filename) + + # Build the request of the encoding job + json_params = generate_json_for_encoding( + input_file, output_file, preset_info, self._sorenson_queue) + proxies = current_app.config['CDS_SORENSON_PROXIES'] + + headers = {'Accept': 'application/json'} + logging.debug('Sending job to Sorenson {0}'.format(json_params)) + response = requests.post( + current_app.config['CDS_SORENSON_SUBMIT_URL'], + headers=headers, + json=json_params, + proxies=proxies) + + if response.status_code != requests.codes.ok: + # something is wrong - sorenson server is not responding or the + # configuration is wrong and we can't contact sorenson server + # TODO: cleanup completely the FileInstance + db.session.rollback() + raise SorensonError("{0}: {1}".format(response.status_code, + response.text)) + data = json.loads(response.text) + logging.debug('Encoding Sorenson response {0}'.format(data)) + job_id = data.get('JobId') + + # Continue here until the job is done + status = SorensonStatus.PENDING + with status != SorensonStatus.SUCCESS: + status, percentage, info = CDSSorenson.get_job_status() + + if callback_function: + callback_function(status, percentage, info) + + if status == SorensonStatus.FAILURE: + # TODO: clean up completely the DB + raise RuntimeError('Error encoding: {0}'.format(info)) + elif status == SorensonStatus.REVOKED: + # TODO: clean up completely the DB + raise + # FIXME: better way to put this? + time.sleep(randint(1,10)) + + # Set file's location, if job has completed + self._clean_file_name(output_file) + + # TODO + with db.session.begin_nested(): + uri = output_file + with file_opener_xrootd(uri, 'rb') as transcoded_file: + digest = hashlib.md5(transcoded_file.read()).hexdigest() + size = file_size_xrootd(uri) + checksum = '{0}:{1}'.format('md5', digest) + file_instance.set_uri(uri, size, checksum) + as_object_version( + job_info['version_id']).set_file(file_instance) + + return {'job_id': job_id, 'preset': preset_info, 'object': obj} + + @staticmethod + def get_job_status(job_id): + """.""" + pass -import json -from collections import OrderedDict -from itertools import chain -import requests -from flask import current_app +def defautl_encoding_callback(percentage, status, info): + """.""" -from .error import InvalidAspectRatioError, InvalidResolutionError, \ - SorensonError, TooHighResolutionError -from .proxies import current_cds_sorenson -from .utils import _filepath_for_samba, generate_json_for_encoding, get_status - - -def start_encoding(input_file, output_file, desired_quality, - display_aspect_ratio, max_height=None, max_width=None, - **kwargs): - """Encode a video that is already in the input folder. - - :param input_file: string with the filename, something like - /eos/cds/test/sorenson/8f/m2/728-jsod98-8s9df2-89fg-lksdjf/data where - the last part "data" is the filename and the last directory is the - bucket id. - :param output_file: the file to output the transcoded file. - :param desired_quality: desired quality to transcode to. - :param display_aspect_ratio: the video's aspect ratio - :param max_height: maximum height we want to encode - :param max_width: maximum width we want to encode - :param kwargs: other technical metadata - :returns: job ID. - """ - input_file = _filepath_for_samba(input_file) - output_file = _filepath_for_samba(output_file) - - aspect_ratio, preset_config = _get_quality_preset(desired_quality, - display_aspect_ratio, - video_height=max_height, - video_width=max_width) - - current_app.logger.debug( - 'Transcoding {0} to quality {1} and aspect ratio {2}'.format( - input_file, desired_quality, aspect_ratio)) - - # Build the request of the encoding job - json_params = generate_json_for_encoding(input_file, output_file, - preset_config['preset_id']) - proxies = current_app.config['CDS_SORENSON_PROXIES'] - headers = {'Accept': 'application/json'} - response = requests.post(current_app.config['CDS_SORENSON_SUBMIT_URL'], - headers=headers, json=json_params, - proxies=proxies) - - data = json.loads(response.text) - - if response.status_code == requests.codes.ok: - job_id = data.get('JobId') - return job_id, aspect_ratio, preset_config - else: - # something is wrong - sorenson server is not responding or the - # configuration is wrong and we can't contact sorenson server - raise SorensonError("{0}: {1}".format(response.status_code, - response.text)) - - -def stop_encoding(job_id): - """Stop encoding job. - - :param job_id: string with the job ID. - :returns: None. - """ - delete_url = (current_app.config['CDS_SORENSON_DELETE_URL'] - .format(job_id=job_id)) - headers = {'Accept': 'application/json'} - proxies = current_app.config['CDS_SORENSON_PROXIES'] - - response = requests.delete(delete_url, headers=headers, proxies=proxies) - if response.status_code != requests.codes.ok: - raise SorensonError("{0}: {1}".format(response.status_code, - response.text)) - - -def get_encoding_status(job_id): - """Get status of a given job from the Sorenson server. - - If the job can't be found in the current queue, it's probably done, so we - check the archival queue. - - :param job_id: string with the job ID. - :returns: tuple with the status message and progress in %. - """ - status = get_status(job_id) - if status == '': - # encoding job was canceled - return "Canceled", 100 - status_json = json.loads(status) - # there are different ways to get the status of a job, depending if - # the job was successful, so we should check for the status code in - # different places - job_status = status_json.get('Status', {}).get('Status') - job_progress = status_json.get('Status', {}).get('Progress') - if job_status: - return current_app.config['CDS_SORENSON_STATUSES'].get(job_status), \ - job_progress - # status not found? check in different place - job_status = status_json.get('StatusStateId') - if job_status: - # job is probably either finished or failed, so the progress will - # always be 100% in this case - return current_app.config['CDS_SORENSON_STATUSES'].get(job_status), 100 - # No status was found (which shouldn't happen) - raise SorensonError('No status found for job: {0}'.format(job_id)) - - -def restart_encoding(job_id, input_file, output_file, desired_quality, - display_aspect_ratio, **kwargs): - """Try to stop the encoding job and start a new one. - - It's impossible to get the input_file and preset_quality from the - job_id, if the job has not yet finished, so we need to specify all - parameters for stopping and starting the encoding job. - """ - try: - stop_encoding(job_id) - except SorensonError: - # If we failed to stop the encoding job, ignore it - in the worst - # case the encoding will finish and we will overwrite the file. - pass - return start_encoding(input_file, output_file, desired_quality, - display_aspect_ratio, **kwargs) - - -def _get_available_aspect_ratios(pairs=False): - """Return all available aspect ratios. - - :param pairs: if True, will return aspect ratios as pairs of integers - """ - ratios = [key for key in current_app.config['CDS_SORENSON_PRESETS']] - if pairs: - ratios = [tuple(map(int, ratio.split(':', 1))) for ratio in ratios] - return ratios - - -def get_all_distinct_qualities(): - """Return all distinct available qualities, independently of presets. - - :returns all the qualities without duplications. For example, if presets A - has [240p, 360p, 480p] and presets B has [240p, 480p], the result will be - [240p, 360p, 480p]. - """ - # get all possible qualities - all_qualities = [ - outer_dict.keys() - for outer_dict in current_app.config['CDS_SORENSON_PRESETS'].values() - ] - # remove duplicates while preserving ordering - all_distinct_qualities = OrderedDict.fromkeys(chain(*all_qualities)) - return list(all_distinct_qualities) - - -def can_be_transcoded(subformat_desired_quality, video_aspect_ratio, - video_width=None, video_height=None): - """Return the details of the subformat that will be generated. - - :param subformat_desired_quality: the quality desired for the subformat - :param video_aspect_ratio: the original video aspect ratio - :param video_width: the original video width - :param video_height: the original video height - :returns a dict with aspect ratio, width and height if the subformat can - be generated, or False otherwise - """ - try: - ar, conf = _get_quality_preset(subformat_desired_quality, - video_aspect_ratio, - video_height=video_height, - video_width=video_width) - return dict(quality=subformat_desired_quality, aspect_ratio=ar, - width=conf['width'], height=conf['height']) - except (InvalidAspectRatioError, InvalidResolutionError, - TooHighResolutionError) as _: - return None - - -def _get_closest_aspect_ratio(width, height): - """Return the closest configured aspect ratio to the given height/width. - - :param height: video height - :param width: video width - """ - # calculate the aspect ratio fraction - unknown_ar_fraction = float(width) / height - - # find the closest aspect ratio fraction to the unknown - closest_fraction = min(current_cds_sorenson.aspect_ratio_fractions.keys(), - key=lambda x: abs(x - unknown_ar_fraction)) - return current_cds_sorenson.aspect_ratio_fractions[closest_fraction] - - -def _get_quality_preset(subformat_desired_quality, video_aspect_ratio, - video_height=None, video_width=None): - """Return the transcoding config for a given aspect ratio and quality. - - :param subformat_desired_quality: the desired quality for transcoding - :param video_aspect_ratio: the video's aspect ratio - :param video_height: maximum output height for transcoded video - :param video_width: maximum output width for transcoded video - :returns the transcoding config for a given inputs - """ - try: - ar_presets = current_app.config['CDS_SORENSON_PRESETS'][ - video_aspect_ratio] - except KeyError: - if video_height and video_width: - video_aspect_ratio = _get_closest_aspect_ratio(video_height, - video_width) - ar_presets = current_app.config['CDS_SORENSON_PRESETS'][ - video_aspect_ratio] - else: - raise InvalidAspectRatioError(video_aspect_ratio) - - try: - preset_config = ar_presets[subformat_desired_quality] - except KeyError: - raise InvalidResolutionError(video_aspect_ratio, - subformat_desired_quality) - - if (video_height and video_height < preset_config['height']) or \ - (video_width and video_width < preset_config['width']): - raise TooHighResolutionError(video_aspect_ratio, video_height, - video_width, preset_config['height'], - preset_config['width']) - - return video_aspect_ratio, preset_config + +def default_extract_metadata_callback(extracted_metadata): + """.""" + + +__all__ = ( + 'CDSSorenson', + 'start_encoding', + 'get_all_distinct_qualities', + 'get_encoding_status', + 'restart_encoding', + 'start_encoding', + 'stop_encoding', + 'can_be_transcoded', +) diff --git a/cds_sorenson/config.py b/cds_sorenson/config.py index 176b05d..541be9b 100644 --- a/cds_sorenson/config.py +++ b/cds_sorenson/config.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of CERN Document Server. -# Copyright (C) 2016, 2017 CERN. +# Copyright (C) 2016, 2017, 2018 CERN. # # Invenio is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public License as @@ -414,6 +414,10 @@ } """Statuses returned from Sorenson.""" +CDS_SORENSON_DONE_STATUSES = () +CDS_SORENSON_ERROR_STATUSES = () +CDS_SORENSON_RUNNING_STATUSES = () + CDS_SORENSON_SAMBA_DIRECTORY = 'file://media-smb.cern.ch/mediacds/' """Sorenson's EOS internal mounting point via samba.""" diff --git a/cds_sorenson/error.py b/cds_sorenson/error.py index 931d8e0..f6d521d 100644 --- a/cds_sorenson/error.py +++ b/cds_sorenson/error.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of CERN Document Server. -# Copyright (C) 2016 CERN. +# Copyright (C) 2016, 2018 CERN. # # Invenio is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public License as @@ -82,3 +82,22 @@ def __str__(self): 'maximum resolution accepted {3}x{4}.').format( self._aspect_ratio, self._width, self._height, self._max_weight, self._max_height) + + +class FFmpegExecutionError(Exception): + """Raised when there is an execution error of an FFmpeg subprocess.""" + + def __init__(self, process_error): + self.internal_error = process_error + self.cmd = ' '.join(process_error.cmd) + self.error_code = process_error.returncode + self.error_message = process_error.output.decode('utf-8') + + def __repr__(self): + return ('COMMAND: {0}\n' + 'ERROR_CODE: {1}\n' + 'OUTPUT: {2}').format(self.cmd, self.error_code, + self.error_message) + + def __str__(self): + return self.__repr__() diff --git a/cds_sorenson/legacy_api.py b/cds_sorenson/legacy_api.py new file mode 100644 index 0000000..3d70379 --- /dev/null +++ b/cds_sorenson/legacy_api.py @@ -0,0 +1,263 @@ +# -*- coding: utf-8 -*- +# +# This file is part of CERN Document Server. +# Copyright (C) 2016, 2017, 2018 CERN. +# +# Invenio is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. + +"""API to use Sorenson transcoding server.""" + +from __future__ import absolute_import, print_function + +import json +import warnings +from collections import OrderedDict +from itertools import chain + +import requests +from flask import current_app + +from .error import InvalidAspectRatioError, InvalidResolutionError, \ + SorensonError, TooHighResolutionError +from .proxies import current_cds_sorenson +from .utils import filepath_for_samba, generate_json_for_encoding, get_status + +warnings.warn( + "This API is deprecated. Use it at your own risk!", + DeprecationWarning) + +def start_encoding(input_file, output_file, desired_quality, + display_aspect_ratio, max_height=None, max_width=None, + **kwargs): + """Encode a video that is already in the input folder. + + :param input_file: string with the filename, something like + /eos/cds/test/sorenson/8f/m2/728-jsod98-8s9df2-89fg-lksdjf/data where + the last part "data" is the filename and the last directory is the + bucket id. + :param output_file: the file to output the transcoded file. + :param desired_quality: desired quality to transcode to. + :param display_aspect_ratio: the video's aspect ratio + :param max_height: maximum height we want to encode + :param max_width: maximum width we want to encode + :param kwargs: other technical metadata + :returns: job ID. + """ + input_file = filepath_for_samba(input_file) + output_file = filepath_for_samba(output_file) + + aspect_ratio, preset_config = _get_quality_preset(desired_quality, + display_aspect_ratio, + video_height=max_height, + video_width=max_width) + + current_app.logger.debug( + 'Transcoding {0} to quality {1} and aspect ratio {2}'.format( + input_file, desired_quality, aspect_ratio)) + + # Build the request of the encoding job + json_params = generate_json_for_encoding(input_file, output_file, + preset_config['preset_id']) + proxies = current_app.config['CDS_SORENSON_PROXIES'] + headers = {'Accept': 'application/json'} + response = requests.post(current_app.config['CDS_SORENSON_SUBMIT_URL'], + headers=headers, json=json_params, + proxies=proxies) + + data = json.loads(response.text) + + if response.status_code == requests.codes.ok: + job_id = data.get('JobId') + return job_id, aspect_ratio, preset_config + else: + # something is wrong - sorenson server is not responding or the + # configuration is wrong and we can't contact sorenson server + raise SorensonError("{0}: {1}".format(response.status_code, + response.text)) + + +def stop_encoding(job_id): + """Stop encoding job. + + :param job_id: string with the job ID. + :returns: None. + """ + delete_url = (current_app.config['CDS_SORENSON_DELETE_URL'] + .format(job_id=job_id)) + headers = {'Accept': 'application/json'} + proxies = current_app.config['CDS_SORENSON_PROXIES'] + + response = requests.delete(delete_url, headers=headers, proxies=proxies) + if response.status_code != requests.codes.ok: + raise SorensonError("{0}: {1}".format(response.status_code, + response.text)) + + +def get_encoding_status(job_id): + """Get status of a given job from the Sorenson server. + + If the job can't be found in the current queue, it's probably done, so we + check the archival queue. + + :param job_id: string with the job ID. + :returns: tuple with the status message and progress in %. + """ + status = get_status(job_id) + if status == '': + # encoding job was canceled + return "Canceled", 100 + status_json = json.loads(status) + # there are different ways to get the status of a job, depending if + # the job was successful, so we should check for the status code in + # different places + job_status = status_json.get('Status', {}).get('Status') + job_progress = status_json.get('Status', {}).get('Progress') + if job_status: + return current_app.config['CDS_SORENSON_STATUSES'].get(job_status), \ + job_progress + # status not found? check in different place + job_status = status_json.get('StatusStateId') + if job_status: + # job is probably either finished or failed, so the progress will + # always be 100% in this case + return current_app.config['CDS_SORENSON_STATUSES'].get(job_status), 100 + # No status was found (which shouldn't happen) + raise SorensonError('No status found for job: {0}'.format(job_id)) + + +def restart_encoding(job_id, input_file, output_file, desired_quality, + display_aspect_ratio, **kwargs): + """Try to stop the encoding job and start a new one. + + It's impossible to get the input_file and preset_quality from the + job_id, if the job has not yet finished, so we need to specify all + parameters for stopping and starting the encoding job. + """ + try: + stop_encoding(job_id) + except SorensonError: + # If we failed to stop the encoding job, ignore it - in the worst + # case the encoding will finish and we will overwrite the file. + pass + return start_encoding(input_file, output_file, desired_quality, + display_aspect_ratio, **kwargs) + + +def _get_available_aspect_ratios(pairs=False): + """Return all available aspect ratios. + + :param pairs: if True, will return aspect ratios as pairs of integers + """ + ratios = [key for key in current_app.config['CDS_SORENSON_PRESETS']] + if pairs: + ratios = [tuple(map(int, ratio.split(':', 1))) for ratio in ratios] + return ratios + + +def get_all_distinct_qualities(): + """Return all distinct available qualities, independently of presets. + + :returns all the qualities without duplications. For example, if presets A + has [240p, 360p, 480p] and presets B has [240p, 480p], the result will be + [240p, 360p, 480p]. + """ + # get all possible qualities + all_qualities = [ + outer_dict.keys() + for outer_dict in current_app.config['CDS_SORENSON_PRESETS'].values() + ] + # remove duplicates while preserving ordering + all_distinct_qualities = OrderedDict.fromkeys(chain(*all_qualities)) + return list(all_distinct_qualities) + + +def can_be_transcoded(subformat_desired_quality, video_aspect_ratio, + video_width=None, video_height=None): + """Return the details of the subformat that will be generated. + + :param subformat_desired_quality: the quality desired for the subformat + :param video_aspect_ratio: the original video aspect ratio + :param video_width: the original video width + :param video_height: the original video height + :returns a dict with aspect ratio, width and height if the subformat can + be generated, or False otherwise + """ + try: + ar, conf = _get_quality_preset(subformat_desired_quality, + video_aspect_ratio, + video_height=video_height, + video_width=video_width) + return dict(quality=subformat_desired_quality, aspect_ratio=ar, + width=conf['width'], height=conf['height']) + except (InvalidAspectRatioError, InvalidResolutionError, + TooHighResolutionError) as _: + return None + + +def _get_closest_aspect_ratio(width, height): + """Return the closest configured aspect ratio to the given height/width. + + :param height: video height + :param width: video width + """ + # calculate the aspect ratio fraction + unknown_ar_fraction = float(width) / height + + # find the closest aspect ratio fraction to the unknown + closest_fraction = min(current_cds_sorenson.aspect_ratio_fractions.keys(), + key=lambda x: abs(x - unknown_ar_fraction)) + return current_cds_sorenson.aspect_ratio_fractions[closest_fraction] + + +def _get_quality_preset(subformat_desired_quality, video_aspect_ratio, + video_height=None, video_width=None): + """Return the transcoding config for a given aspect ratio and quality. + + :param subformat_desired_quality: the desired quality for transcoding + :param video_aspect_ratio: the video's aspect ratio + :param video_height: maximum output height for transcoded video + :param video_width: maximum output width for transcoded video + :returns the transcoding config for a given inputs + """ + try: + ar_presets = current_app.config['CDS_SORENSON_PRESETS'][ + video_aspect_ratio] + except KeyError: + if video_height and video_width: + video_aspect_ratio = _get_closest_aspect_ratio(video_height, + video_width) + ar_presets = current_app.config['CDS_SORENSON_PRESETS'][ + video_aspect_ratio] + else: + raise InvalidAspectRatioError(video_aspect_ratio) + + try: + preset_config = ar_presets[subformat_desired_quality] + except KeyError: + raise InvalidResolutionError(video_aspect_ratio, + subformat_desired_quality) + + if (video_height and video_height < preset_config['height']) or \ + (video_width and video_width < preset_config['width']): + raise TooHighResolutionError(video_aspect_ratio, video_height, + video_width, preset_config['height'], + preset_config['width']) + + return video_aspect_ratio, preset_config diff --git a/cds_sorenson/utils.py b/cds_sorenson/utils.py index de17f84..b64c3af 100644 --- a/cds_sorenson/utils.py +++ b/cds_sorenson/utils.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of CERN Document Server. -# Copyright (C) 2016, 2017 CERN. +# Copyright (C) 2016, 2017, 2018 CERN. # # Invenio is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public License as @@ -21,12 +21,19 @@ # In applying this license, CERN does not # waive the privileges and immunities granted to it by virtue of its status # as an Intergovernmental Organization or submit itself to any jurisdiction. - """API to use Sorenson transcoding server.""" from __future__ import absolute_import, print_function +import logging +import os +import os.path +import shutil +import tempfile +from contextlib import contextmanager from itertools import chain +from subprocess import STDOUT, CalledProcessError, check_output +from time import sleep import requests from flask import current_app @@ -34,7 +41,10 @@ from .error import SorensonError -def generate_json_for_encoding(input_file, output_file, preset_id): +def generate_json_for_encoding(input_file, + output_file, + preset_id, + sorenson_queue=None): """Generate JSON that will be sent to Sorenson server to start encoding.""" current_preset = _get_preset_config(preset_id) # Make sure the preset config exists for a given preset_id @@ -43,13 +53,16 @@ def generate_json_for_encoding(input_file, output_file, preset_id): return dict( Name='CDS File:{0} Preset:{1}'.format(input_file, preset_id), - QueueId=current_app.config['CDS_SORENSON_DEFAULT_QUEUE'], + QueueId=sorenson_queue + or current_app.config['CDS_SORENSON_DEFAULT_QUEUE'], JobMediaInfo=dict( - SourceMediaList=[dict( - FileUri=input_file, - UserName=current_app.config['CDS_SORENSON_USERNAME'], - Password=current_app.config['CDS_SORENSON_PASSWORD'], - )], + SourceMediaList=[ + dict( + FileUri=input_file, + UserName=current_app.config['CDS_SORENSON_USERNAME'], + Password=current_app.config['CDS_SORENSON_PASSWORD'], + ) + ], DestinationList=[dict(FileUri='{}'.format(output_file))], CompressionPresetList=[dict(PresetId=preset_id)], ), @@ -64,8 +77,8 @@ def name_generator(master_name, preset): :returns: string with the slave name for this preset. """ return ("{master_name}-{video_bitrate}-kbps-{width}x{height}-audio-" - "{audio_bitrate}-kbps-stereo.mp4".format(master_name='master_name', - **preset)) + "{audio_bitrate}-kbps-stereo.mp4".format( + master_name='master_name', **preset)) def get_status(job_id): @@ -78,12 +91,12 @@ def get_status(job_id): :param job_id: string with the job ID. :returns: JSON with the status or empty string if the job was not found. """ - current_jobs_url = (current_app - .config['CDS_SORENSON_CURRENT_JOBS_STATUS_URL'] - .format(job_id=job_id)) - archive_jobs_url = (current_app - .config['CDS_SORENSON_ARCHIVE_JOBS_STATUS_URL'] - .format(job_id=job_id)) + current_jobs_url = ( + current_app.config['CDS_SORENSON_CURRENT_JOBS_STATUS_URL'] + .format(job_id=job_id)) + archive_jobs_url = ( + current_app.config['CDS_SORENSON_ARCHIVE_JOBS_STATUS_URL'] + .format(job_id=job_id)) headers = {'Accept': 'application/json'} proxies = current_app.config['CDS_SORENSON_PROXIES'] @@ -109,7 +122,7 @@ def _get_preset_config(preset_id): return inner_dict -def _filepath_for_samba(filepath): +def filepath_for_samba(obj): """Adjust file path for Samba protocol. Sorenson has the eos directory mounted through samba, so the paths @@ -118,3 +131,75 @@ def _filepath_for_samba(filepath): samba_dir = current_app.config['CDS_SORENSON_SAMBA_DIRECTORY'] eos_dir = current_app.config['CDS_SORENSON_CDS_DIRECTORY'] return filepath.replace(eos_dir, samba_dir) + + +def run_ffmpeg_command(cmd, obj, **kwargs): + """Run ffmpeg command and capture errors.""" + kwargs.setdefault('stderr', STDOUT) + try: + return check_output(cmd.split(), **kwargs) + except CalledProcessError as e: + raise FFmpegExecutionError(e) + + +def eos_fuse_fail_safe(f): + """Try to run on FUSE and if not bring the file home. + + Assumptions: + - The method will use `input_file` as key in the kwargs to get the real + path to the file. + """ + + def wrapper(self, *args, **kwargs): + if 'input_file' in kwargs: + # Don't care, you know what you are doing + return f(self, *args, **kwargs) + + # Try first to use EOS FUSE + kwargs['input_file'] = self.video.file.uri.replace( + current_app.config['VIDEOS_XROOTD_ENDPOINT'], '') + try: + # sometimes FUSE is slow to update, retry a couple of times + sleep(2) + return f(self, *args, **kwargs) + except Exception as e: + logging.error( + '#EOS_FUSE_ERROR: file not accesible via FUSE {0}'.format( + self.video)) + + # Surprise fuse didn't work! Copy the file to tmp + logging.info('Copying file to local file system') + temp_folder = tempfile.mkdtemp() + temp_location = os.path.join(temp_folder, 'data') + with open(temp_location, 'wb') as dst: + shutil.copyfileobj(_file_opener_xrootd(obj.file.uri, 'rb'), dst) + kwargs['input_file'] = temp_location + try: + result = f(self, *args, *kwargs) + finally: + shutil.rmtree(temp_folder) + return result + + +def _file_opener_xrootd(path, *args, **kwargs): + """File opener from XRootD path. + + :param path: The file path for the opener. + :returns: an open file + + .. note:: + + This will return an open file via ``XRootDPyFS`` if XRootD is + enabled. + """ + if current_app.config['XROOTD_ENABLED'] and \ + current_app.config['VIDEOS_XROOTD_ENDPOINT'] in path: + from xrootdpyfs import XRootDPyFS + # Get the filename + _filename = path.split('/')[-1] + # Remove filename from the path + path = path.replace(_filename, '') + fs = XRootDPyFS(path) + return fs.open('data', *args, **kwargs) + # No XrootD return a normal file + return open(path, *args, **kwargs)