394 lines (304 sloc) 13 KB
This module implements all the functions to read a video or a picture
using ffmpeg. It is quite ugly, as there are many pitfalls to avoid
from __future__ import division
import subprocess as sp
import re
import warnings
import logging
import numpy as np
from moviepy.config import get_setting # ffmpeg, ffmpeg.exe, etc...
from import cvsecs
from moviepy.compat import PY3, DEVNULL
import os
class FFMPEG_VideoReader:
def __init__(self, filename, print_infos=False, bufsize = None,
pix_fmt="rgb24", check_duration=True,
target_resolution=None, resize_algo='bicubic',
self.filename = filename
self.proc = None
infos = ffmpeg_parse_infos(filename, print_infos, check_duration,
self.fps = infos['video_fps']
self.size = infos['video_size']
self.rotation = infos['video_rotation']
if target_resolution:
# revert the order, as ffmpeg used (width, height)
target_resolution = target_resolution[1], target_resolution[0]
if None in target_resolution:
ratio = 1
for idx, target in enumerate(target_resolution):
if target:
ratio = target / self.size[idx]
self.size = (int(self.size[0] * ratio), int(self.size[1] * ratio))
self.size = target_resolution
self.resize_algo = resize_algo
self.duration = infos['video_duration']
self.ffmpeg_duration = infos['duration']
self.nframes = infos['video_nframes']
self.infos = infos
self.pix_fmt = pix_fmt
if pix_fmt == 'rgba':
self.depth = 4
self.depth = 3
if bufsize is None:
w, h = self.size
bufsize = self.depth * w * h + 100
self.bufsize= bufsize
self.pos = 1
self.lastread = self.read_frame()
def initialize(self, starttime=0):
"""Opens the file, creates the pipe. """
self.close() # if any
if starttime != 0 :
offset = min(1, starttime)
i_arg = ['-ss', "%.06f" % (starttime - offset),
'-i', self.filename,
'-ss', "%.06f" % offset]
i_arg = [ '-i', self.filename]
cmd = ([get_setting("FFMPEG_BINARY")] + i_arg +
['-loglevel', 'error',
'-f', 'image2pipe',
'-vf', 'scale=%d:%d' % tuple(self.size),
'-sws_flags', self.resize_algo,
"-pix_fmt", self.pix_fmt,
'-vcodec', 'rawvideo', '-'])
popen_params = {"bufsize": self.bufsize,
"stdout": sp.PIPE,
"stderr": sp.PIPE,
"stdin": DEVNULL}
if == "nt":
popen_params["creationflags"] = 0x08000000
self.proc = sp.Popen(cmd, **popen_params)
def skip_frames(self, n=1):
"""Reads and throws away n frames """
w, h = self.size
for i in range(n):*w*h)
self.pos += n
def read_frame(self):
w, h = self.size
nbytes= self.depth*w*h
s =
if len(s) != nbytes:
warnings.warn("Warning: in file %s, "%(self.filename)+
"%d bytes wanted but %d bytes read,"%(nbytes, len(s))+
"at frame %d/%d, at time %.02f/%.02f sec. "%(
"Using the last valid frame instead.",
if not hasattr(self, 'lastread'):
raise IOError(("MoviePy error: failed to read the first frame of "
"video file %s. That might mean that the file is "
"corrupted. That may also mean that you are using "
"a deprecated version of FFMPEG. On Ubuntu/Debian "
"for instance the version in the repos is deprecated. "
"Please update to a recent version from the website.")%(
result = self.lastread
result = np.fromstring(s, dtype='uint8')
result.shape =(h, w, len(s)//(w*h)) # reshape((h, w, len(s)//(w*h)))
self.lastread = result
return result
def get_frame(self, t):
""" Read a file video frame at time t.
Note for coders: getting an arbitrary frame in the video with
ffmpeg can be painfully slow if some decoding has to be done.
This function tries to avoid fetching arbitrary frames
whenever possible, by moving between adjacent frames.
# these definitely need to be rechecked sometime. Seems to work.
# I use that horrible '+0.00001' hack because sometimes due to numerical
# imprecisions a 3.0 can become a 2.99999999... which makes the int()
# go to the previous integer. This makes the fetching more robust in the
# case where you get the nth frame by writing get_frame(n/fps).
pos = int(self.fps*t + 0.00001)+1
# Initialize proc if it is not open
if not self.proc:
self.pos = pos
self.lastread = self.read_frame()
if pos == self.pos:
return self.lastread
if (pos < self.pos) or (pos > self.pos + 100):
self.pos = pos
result = self.read_frame()
self.pos = pos
return result
def close(self):
if self.proc:
self.proc = None
if hasattr(self, 'lastread'):
del self.lastread
def ffmpeg_read_image(filename, with_mask=True):
""" Read an image file (PNG, BMP, JPEG...).
Wraps FFMPEG_Videoreader to read just one image.
Returns an ImageClip.
This function is not meant to be used directly in MoviePy,
use ImageClip instead to make clips out of image files.
Name of the image file. Can be of any format supported by ffmpeg.
If the image has a transparency layer, ``with_mask=true`` will save
this layer as the mask of the returned ImageClip
if with_mask:
pix_fmt = 'rgba'
pix_fmt = "rgb24"
reader = FFMPEG_VideoReader(filename, pix_fmt=pix_fmt, check_duration=False)
im = reader.lastread
del reader
return im
def ffmpeg_parse_infos(filename, print_infos=False, check_duration=True,
"""Get file infos using ffmpeg.
Returns a dictionnary with the fields:
"video_found", "video_fps", "duration", "video_nframes",
"video_duration", "audio_found", "audio_fps"
"video_duration" is slightly smaller than "duration" to avoid
fetching the uncomplete frames at the end, which raises an error.
# open the file in a pipe, provoke an error, read output
is_GIF = filename.endswith('.gif')
cmd = [get_setting("FFMPEG_BINARY"), "-i", filename]
if is_GIF:
cmd += ["-f", "null", "/dev/null"]
popen_params = {"bufsize": 10**5,
"stdout": sp.PIPE,
"stderr": sp.PIPE,
"stdin": DEVNULL}
if == "nt":
popen_params["creationflags"] = 0x08000000
proc = sp.Popen(cmd, **popen_params)
infos ='utf8')
del proc
if print_infos:
# print the whole info text returned by FFMPEG
lines = infos.splitlines()
if "No such file or directory" in lines[-1]:
raise IOError(("MoviePy error: the file %s could not be found!\n"
"Please check that you entered the correct "
result = dict()
# get duration (in seconds)
result['duration'] = None
if check_duration:
keyword = ('frame=' if is_GIF else 'Duration: ')
# for large GIFS the "full" duration is presented as the last element in the list.
index = -1 if is_GIF else 0
line = [l for l in lines if keyword in l][index]
match = re.findall("([0-9][0-9]:[0-9][0-9]:[0-9][0-9].[0-9][0-9])", line)[0]
result['duration'] = cvsecs(match)
raise IOError(("MoviePy error: failed to read the duration of file %s.\n"
"Here are the file infos returned by ffmpeg:\n\n%s")%(
filename, infos))
# get the output line that speaks about video
lines_video = [l for l in lines if ' Video: ' in l and'\d+x\d+', l)]
result['video_found'] = ( lines_video != [] )
if result['video_found']:
line = lines_video[0]
# get the size, of the form 460x320 (w x h)
match =" [0-9]*x[0-9]*(,| )", line)
s = list(map(int, line[match.start():match.end()-1].split('x')))
result['video_size'] = s
raise IOError(("MoviePy error: failed to read video dimensions in file %s.\n"
"Here are the file infos returned by ffmpeg:\n\n%s")%(
filename, infos))
# Get the frame rate. Sometimes it's 'tbr', sometimes 'fps', sometimes
# tbc, and sometimes tbc/2...
# Current policy: Trust tbr first, then fps unless fps_source is
# specified as 'fps' in which case try fps then tbr
# If result is near from x*1000/1001 where x is 23,24,25,50,
# replace by x*1000/1001 (very common case for the fps).
def get_tbr():
match ="( [0-9]*.| )[0-9]* tbr", line)
# Sometimes comes as e.g. 12k. We need to replace that with 12000.
s_tbr = line[match.start():match.end()].split(' ')[1]
if "k" in s_tbr:
tbr = float(s_tbr.replace("k", "")) * 1000
tbr = float(s_tbr)
return tbr
def get_fps():
match ="( [0-9]*.| )[0-9]* fps", line)
fps = float(line[match.start():match.end()].split(' ')[1])
return fps
if fps_source == 'tbr':
result['video_fps'] = get_tbr()
result['video_fps'] = get_fps()
elif fps_source == 'fps':
result['video_fps'] = get_fps()
result['video_fps'] = get_tbr()
# It is known that a fps of 24 is often written as 24000/1001
# but then ffmpeg nicely rounds it to 23.98, which we hate.
coef = 1000.0/1001.0
fps = result['video_fps']
for x in [23,24,25,30,50]:
if (fps!=x) and abs(fps - x*coef) < .01:
result['video_fps'] = x*coef
if check_duration:
result['video_nframes'] = int(result['duration']*result['video_fps'])+1
result['video_duration'] = result['duration']
result['video_nframes'] = 1
result['video_duration'] = None
# We could have also recomputed the duration from the number
# of frames, as follows:
# >>> result['video_duration'] = result['video_nframes'] / result['video_fps']
# get the video rotation info.
rotation_lines = [l for l in lines if 'rotate :' in l and'\d+$', l)]
if len(rotation_lines):
rotation_line = rotation_lines[0]
match ='\d+$', rotation_line)
result['video_rotation'] = int(rotation_line[match.start() : match.end()])
result['video_rotation'] = 0
raise IOError(("MoviePy error: failed to read video rotation in file %s.\n"
"Here are the file infos returned by ffmpeg:\n\n%s")%(
filename, infos))
lines_audio = [l for l in lines if ' Audio: ' in l]
result['audio_found'] = lines_audio != []
if result['audio_found']:
line = lines_audio[0]
match =" [0-9]* Hz", line)
hz_string = line[match.start()+1:match.end()-3] # Removes the 'hz' from the end
result['audio_fps'] = int(hz_string)
result['audio_fps'] = 'unknown'
return result