Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
artiomn committed Sep 9, 2020
1 parent be9c2cd commit 9555439
Showing 1 changed file with 57 additions and 42 deletions.
99 changes: 57 additions & 42 deletions images_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
import os
import re
import requests
import sys
import unicodedata

from typing import Optional, List

from pkg.transformers.md.transformer import ArticleTransformer


__version__ = '0.0.2'


del types_map['.jpe']


Expand All @@ -27,8 +29,8 @@ def slugify(value):
"""

value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
value = re.sub('[^\w\s-]', '', value.decode()).strip().lower()
value = re.sub('[-\s]+', '-', value)
value = re.sub(r'[^\w\s-]', '', value.decode()).strip().lower()
value = re.sub(r'[-\s]+', '-', value)

return value

Expand All @@ -38,7 +40,6 @@ def get_filename_from_url(req: requests.Response) -> Optional[str]:
Get filename from url and, if not found, try to get from content-disposition.
"""

result = None
if req.url.find('/'):
result = req.url.rsplit('/', 1)[1]
else:
Expand All @@ -47,43 +48,44 @@ def get_filename_from_url(req: requests.Response) -> Optional[str]:
if cd is None:
return None

fname = re.findall('filename=(.+)', cd)
file_name = re.findall('filename=(.+)', cd)

if len(fname) == 0:
if len(file_name) == 0:
return None

result = fname[0]
result = file_name[0]

f_name, f_ext = os.path.splitext(result)

if not f_ext:
result = f'{slugify(f_name)}{guess_extension(req.headers["content-type"].partition(";")[0].strip())}'
else:
result = f'{slugify(f_name)}.{slugify(f_ext)}'
result = f'{slugify(f_name)}{guess_extension(req.headers["content-type"].partition(";")[0].strip())}' if not f_ext\
else f'{slugify(f_name)}.{slugify(f_ext)}'

return result


class ImageDownloader:
def __init__(self, article_path: str, skip_list: Optional[List[str]] = None, skip_all: bool = False, img_dirname: str = 'images', img_publicpath: str = ''):
self.img_dirname = img_dirname
self.img_publicpath = img_publicpath
allowed_url_prefixes = {'http', 'ftp'}

def __init__(self, article_path: str, skip_list: Optional[List[str]] = None, skip_all: bool = False,
img_dir_name: str = 'images', img_public_path: str = ''):
self.img_dir_name = img_dir_name
self.img_public_path = img_public_path
self._article_file_path = article_path
self._skip_list = sorted(skip_list) if skip_list is not None else []
self._imgs_dir = os.path.join(os.path.dirname(self._article_file_path), self.img_dirname)
self._images_dir = os.path.join(os.path.dirname(self._article_file_path), self.img_dir_name)
self._skip_all = skip_all

def download_images(self, images: List[str]) -> dict:
path_join = os.path.join
img_dirname = self.img_dirname
img_publicpath = self.img_publicpath
imgs_dir = self._imgs_dir
replacement_mapping = {}
skip_list = self._skip_list
img_count = len(images)
path_join = os.path.join
img_dir_name = self.img_dir_name
img_public_path = self.img_public_path
images_dir = self._images_dir

try:
os.makedirs(self._imgs_dir)
os.makedirs(self._images_dir)
except FileExistsError:
# Existing directory is not error.
pass
Expand All @@ -95,21 +97,14 @@ def download_images(self, images: List[str]) -> dict:
print(f'Image {img_num + 1} ["{img_url}"] was skipped, because it\'s in the skip list...')
continue

if not img_url.startswith('http'):
if not self._is_allowed_url_prefix(img_url):
print(f'Image {img_num + 1} ["{img_url}"] was skipped, because it has incorrect URL...')
continue

print(f'Downloading image {img_num + 1} of {img_count} from "{img_url}"...')

try:
try:
img_response = requests.get(img_url, allow_redirects=True)
except requests.exceptions.SSLError:
print('Incorrect SSL certificate, trying to download without verifying...')
img_response = requests.get(img_url, allow_redirects=True, verify=False)

if img_response.status_code != 200:
raise OSError(str(img_response))
img_response = ImageDownloader._download_image(img_url)
except Exception as e:
if self._skip_all:
print(f'Warning: can\'t download image {img_num + 1}, error: [{str(e)}], '
Expand All @@ -118,25 +113,45 @@ def download_images(self, images: List[str]) -> dict:
raise

img_filename = get_filename_from_url(img_response)
img_path = path_join(imgs_dir, img_filename)
print(f'Image will be written to the file "{img_path}"...')
replacement_mapping.setdefault(img_url, path_join(img_publicpath or img_dirname, img_filename))
img_path = path_join(images_dir, img_filename)
replacement_mapping.setdefault(img_url, path_join(img_public_path or img_dir_name, img_filename))

with open(img_path, 'wb') as img_file:
img_file.write(img_response.content)
img_file.close()
ImageDownloader._write_image(img_path, img_response.content)

return replacement_mapping

@staticmethod
def _download_image(image_url):
try:
img_response = requests.get(image_url, allow_redirects=True)
except requests.exceptions.SSLError:
print('Incorrect SSL certificate, trying to download without verifying...')
img_response = requests.get(image_url, allow_redirects=True, verify=False)

def main(args):
if img_response.status_code != 200:
raise OSError(str(img_response))

return img_response

@staticmethod
def _write_image(img_path, data):
print(f'Image will be written to the file "{img_path}"...')
with open(img_path, 'wb') as img_file:
img_file.write(data)
img_file.close()

def _is_allowed_url_prefix(self, url):
return url in self.allowed_url_prefixes


def main(arguments):
"""
Entrypoint.
"""

article_file = os.path.expanduser(args.article_file_path)
skip_list = args.skip_list
skip_all = args.skip_all_incorrect
article_file = os.path.expanduser(arguments.article_file_path)
skip_list = arguments.skip_list
skip_all = arguments.skip_all_incorrect

print('Processing started...')

Expand All @@ -154,8 +169,8 @@ def main(args):
article_file,
skip_list,
skip_all,
args.images_dirname,
args.images_publicpath
arguments.images_dirname,
arguments.images_publicpath
)
).run()

Expand All @@ -174,8 +189,8 @@ def main(args):
help='Public path to the folder of downloaded images')
parser.add_argument('-a', '--skip-all-incorrect', default=False, action='store_true',
help='skip all incorrect images')
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}', help='return version number')

args = parser.parse_args()

main(args)

0 comments on commit 9555439

Please sign in to comment.