Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

executable file 527 lines (375 sloc) 14.835 kb
#!/usr/bin/env python2
# Clipodder, a small "cron-able" utility to download video/audio podcasts
#
# Copyright (C) 2011 Afterburn, cdepillabout
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import urllib2
import ConfigParser
import json
import urllib
import pycurl
from feedparser import parse
class auto_podder():
def __init__(self):
self.config_dict = {}
self.config_dict["default_config_dir"] = os.path.expanduser("~/.clipodder")
self.config_dict["default_config"] = "%s/%s" % (self.config_dict["default_config_dir"], "config")
self.config = ConfigParser.ConfigParser()
self.config_dict["show_progress"] = False
self.config_dict["show_full_paths"] = True
self.curl = None
self.spacer = " "
if self.validate():
self.load_conf()
self.init_curl()
for item in self.config.items("urls"):
#Split it up, so we can specify more arguments.
feed_args = str(item[1]).split(" ")
feed_url = feed_args[0]
self.deal_with_url(feed_url, feed_args)
def debug_msg(self, msg):
if self.debug:
print "\n\n %s \n\n" % (msg)
def load_conf(self):
self.config.readfp(open(self.config_dict["default_config"]))
self.config_dict["download_dir"] = self.config.get("options", "download_dir")
self.config_dict["downloads_per_url"] = self.config.getint("options", "downloads_per_url")
# delete_if_more_than can only be an int. if we don't want anything to be deleted,
# then this should be set to 0.
try:
self.config_dict["delete_if_more_than"] = self.config.getint("options", "delete_if_more_than")
if self.config_dict["delete_if_more_than"] < 0:
self.config_dict["delete_if_more_than"] = 0
except ValueError:
self.config_dict["delete_if_more_than"] = 0
self.config_dict["use_colors"] = self.config.getboolean("options", "use_colors")
# don't use colors if we are not printing to the console
if not sys.stdout.isatty():
self.config_dict["use_colors"] = False
self.validate(self.config_dict["download_dir"], True)
# Print a big warning if downloads_per_url is more than delete_if_more_than.
# If downloads_per_url is larger, we will constantly download excess files
# and then promptly delete them.
if self.config_dict["delete_if_more_than"] == 0:
pass
elif self.config_dict["downloads_per_url"] > self.config_dict["delete_if_more_than"]:
warning = "WARNING! In config file "
warning += "\"%s\", " % self.config_dict["default_config"]
warning += "downloads_per_url is\n"
warning += "greater than delete_if_more_than. "
warning += "This will cause files to constantly\nget downloaded and then "
warning += "subsequently deleted everytime clipodder is run."
self.message("%s" % self.color(warning, bg="red"), True)
# Add some space so its easier for the user to read.
self.message("\n",True)
# Added self.try_config, so if they forget, or don't add some option, it won't shit a brick. ~
self.config_dict["show_progress"] = self.try_config("options",
"show_progress",
self.config_dict["show_progress"], True)
self.config_dict["show_full_paths"] = self.try_config("options",
"show_full_paths",
self.config_dict["show_full_paths"], True)
def try_config(self, c_section, c_value, value, boolean=False):
""" Trys to load a value from the config, *if it can*
it will return the new value, otherwise it will return
the given value (value)"""
try:
if boolean:
return self.config.getboolean(c_section, c_value)
return self.config.get(c_section, c_value)
except:
print "Could not determine [%s] from section [%s], defaulting to %s" % (c_value,
c_section,
value)
# Give the value back, because we could not retrieve the new one.
return value
def validate(self, path=None, create=False):
if path:
if os.path.exists(path):
return True
elif create:
os.mkdir(path)
return True
else:
return False
if os.path.isdir(self.config_dict["default_config_dir"]) == False:
self.init_auto_podder()
if os.path.exists(self.config_dict["default_config"]) == False:
self.init_auto_podder()
return True
def init_auto_podder(self):
try:
if os.path.isdir(self.config_dict["default_config_dir"]) == False:
os.mkdir(self.config_dict["default_config_dir"])
print "Created default config directory (%s)" % (
self.config_dict["default_config_dir"])
except Exception, e:
print "Error creating default config dir (%s)" % (self.config_dict["default_config_dir"])
print "Reason: %s" % e
exit(1)
print "Copy the sample_config as %s/.clipodder/config" % os.environ["HOME"]
exit()
def color(self, string, fg=None, bg=None, bold=True):
if not self.config_dict["use_colors"] or (not fg and not bg and not bold):
return string
color_list = [None, "black", "red", "green", "yellow", "blue",
"magenta", "cyan", "white"]
if fg not in color_list:
raise Exception("color passed in %s is not one of %s" % (fg, color_list))
if bg not in color_list:
raise Exception("color passed in %s is not one of %s" % (bg, color_list))
escape = "\033["
# foreground (text) colors
fg_colors = {
"black":'30;',
"red":'31;',
"green": '32;',
"yellow": '33;',
"blue": '34;',
"magenta": '35;',
"cyan": '36;',
"white": '37;',
None: '', # This corresponds to no foreground color
True: '01;', # This corresponds to bold foreground
False: '', # This corresponds to not bold foreground
}
# background colors
bg_colors = {
"black":'40;',
"red":'41;',
"green": '42;',
"yellow": '43;',
"blue": '44;',
"magenta": '45;',
"cyan": '46;',
"white": '47;',
None: '', # This corresponds to no background color
}
# normal text
reset_seq = '%s00m' % escape
color_escape_seq = "%s%s%s%s"%(escape, fg_colors[fg], bg_colors[bg], fg_colors[bold])
# There will be a leftover ';' at the end. We need to get rid of this.
color_escape_seq = color_escape_seq.rstrip(';')
return "%sm%s%s" % (color_escape_seq, string, reset_seq)
def delete_unneeded(self, download_dir, p_title, delete_if_more_than=0):
"""
Delete extra files that we don't want to keep. Deletes oldest files
based on the mtime.
"""
if delete_if_more_than <= 0:
return
all_files = []
directory = os.path.normpath("%s/%s") % (download_dir, p_title)
for path, _, files in os.walk(directory):
for f in files:
timestamp = os.path.getmtime(os.path.join(path, f))
all_files.append([timestamp, os.path.join(path, f)])
# we don't delete anything if there are less files than we want to keep
if len(all_files) <= delete_if_more_than:
return
# sort all files by timestamps
all_files.sort(lambda x, y: int(x[0] * 1000000) - int(y[0] * 1000000))
files_to_delete = all_files[:-delete_if_more_than]
# delete oldest ones up to the ones we want to keep
for info in files_to_delete:
f = info[1]
self.message("%s%s [%s]" % (self.spacer, self.color("Deleteing", fg="red"),
self.path_name(f)), True)
try:
os.unlink(f)
except e:
print("%s \"%s\": %s" %
(self.color("Could not delete", bg="red"), f, str(e)))
def message(self, message, newline=False):
if newline:
sys.stdout.write("%s\n" % (message))
else:
sys.stdout.write("%s" % (message))
def path_name(self, path):
if self.config_dict["show_full_paths"] == True:
return path
elif self.config_dict["show_full_paths"] == False:
return os.path.basename(path)
def print_progress(self, current, total):
if total != 0.0:
pct = int(current/total * 100)
p_bar = "[ %s] " % int(current/total * 100)
if len(str(pct)) == 1:
p_bar = "[%% %s] " % pct
elif len(str(pct)) == 2:
p_bar = "[%% %s] " % pct
elif len(str(pct)) == 3:
p_bar = "[%% %s] " % pct
print " ",
print "\b" * len(p_bar),
print "\r",
print p_bar
def init_curl(self):
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
self.curl.setopt(pycurl.MAXREDIRS, 5)
self.curl.setopt(pycurl.CONNECTTIMEOUT, 10)
def curl_progress(self, download_total, downloaded, upload_total, uploaded):
"""
This function will abort if we get a Ctrl-C. This
enables us to abort in the middle of self.curl.perform().
"""
if self.config_dict["show_progress"]:
self.print_progress(downloaded, download_total)
def download_file(self, name, path, url):
self.message("%s%s [%s]" % (self.spacer,
self.color("Downloading", fg="green"),
self.path_name(path)), True)
# This should fix the problem with folders
# getting created then nothing being added.
# We are "really" downloading at this part,
# so we can validate here.
download_dir = os.path.dirname(path)
if self.validate(download_dir, True):
with open("%s.part" % path, "wb") as download_file:
self.curl.setopt(pycurl.URL, str(url))
self.curl.setopt(pycurl.WRITEDATA, download_file)
self.curl.setopt(pycurl.NOPROGRESS, 0)
self.curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)
self.curl.perform()
os.rename("%s.part" % path, path)
else:
print "WARNING! I could not create the directory: %s" % (download_dir)
def get_link(self, data, download_types):
"""
This gets the downloadable links from an entry in an RSS feed.
- data is a list of entries for a feed parsed with feedparser
(i.e. feedparser.parse(url)["entries"][0:10]).
- download_types is a list of filetypes to download. "audio"
and "video" are treated as MIME types, while any other string
is just treated as a file extension.
This function returns a list of urls to be downloaded.
"""
# the list of links we return from this function
links = []
# These files will be downloaded based on mime type.
# For now, this is only "video" or "audio".
mime_types = []
# These files will be downloaded based on file extension.
# For example, this could be "pdf" or "html".
file_types = []
# (these probably need to be expanded...)
# Yes I added a Windows format, I have nothing against the end user.
video_extensions = ["avi", "flv", "mov", "mp4", "wmv"]
audio_extensions = ["mp3", "ogg", "wav", "m4a", "aac", "wma"]
if download_types == []:
mime_types = ["video", "audio"]
else:
for dtype in download_types:
if dtype == "video":
mime_types.append("video")
file_types.extend(video_extensions)
elif dtype == "audio":
mime_types.append("audio")
file_types.extend(audio_extensions)
else:
file_types.append(dtype)
for stuff in data["links"]:
# get the mime type
try:
type_f = stuff["type"].split("/")[0]
except:
type_f = None
# get the file extension
try:
#strip extra stuff that is not needed from the url
t_extension = stuff["href"].split(".")[-1]
extension = ""
for letter in list(t_extension):
if letter.isalnum():
extension += letter
else:
break
except:
extension = None
# add it if the mime type matches, or the extension matches
# This should help against accidental download of "shadow"
# because "shadow" does not have a extension, unless the
# user purposefully sets it to "shadow".
if type_f != None and type_f in mime_types or extension != None and extension in file_types:
links.append(stuff["href"])
return links
def handle_parsed(self, data, url, args):
if "title" in data["feed"]:
p_title = data["feed"]["title"]
else:
p_title = os.path.basename(url)
print "Checking %s" % p_title.encode('utf8')
download_dir = self.config_dict["download_dir"]
delete_if_more_than = self.config_dict["delete_if_more_than"]
#Check to see if the user wants to download to
#a custom location or use a custom name for the
#download folder
for arg in args:
if arg.startswith("path="):
c_folder_name = arg.split("path=")[1]
if os.path.basename(c_folder_name) == c_folder_name:
download_dir = "%s/%s" % (self.config_dict["download_dir"], c_folder_name)
else:
download_dir = c_folder_name
if arg.startswith("name="):
p_title = arg.split("name=")[1]
if arg.startswith("delete_if_more_than="):
temp_delete_if_more_than = arg.split("delete_if_more_than=")[1]
try:
delete_if_more_than = int(temp_delete_if_more_than)
except:
delete_if_more_than = 0
# I think this is what is creating folders
# when nothing is downloaded.
# So I will move it to self.download_file()
#if self.validate("%s/%s" % (download_dir ,p_title), True):
# We want to download entries from oldest to newest so the
#files' mtimes are set correctly.
entries = data["entries"][0:self.config_dict["downloads_per_url"]]
entries.reverse()
for entry in entries:
urls = self.get_link(entry, args[1:])
#Check to see if anything needs to be downloaded
if not urls:
# This looks a little confusing, but it shortens
# the length of the line.
print("%s %s, nothing found..."
% (self.color("Skipping", fg="blue"),
[stuff["href"] for stuff in entry["links"]]))
continue
for url in urls:
# this should never happen?
if not url:
print("ERROR! url not valid from %s" % p_title)
continue
file_name = os.path.basename(url)
file_path = os.path.normpath("%s/%s/%s") % (download_dir, p_title, file_name)
#Check to see if the file exists, if not, download
if self.validate(file_path) == False:
self.download_file(p_title, file_path, url)
self.delete_unneeded(download_dir, p_title, delete_if_more_than)
def deal_with_url(self, url, args):
if url == "#url here":
print "Add some podcasts! (%s)" % self.config_dict["default_config"]
exit()
elif url:
self.handle_parsed(parse(url), url, args)
if os.getuid() != 0:
clipodder = auto_podder()
else:
print "Do not run me as root"
Jump to Line
Something went wrong with that request. Please try again.