## Parámetros

In [1]:
# Directorio de los resultados de la query
BASE_DIR = '/Users/efraflores/Desktop/EF/Corner/Catalog/PNG_to_JPG'
# Nombre base, es decir, puede haber varios archivos con "nombre_X", "nombre_Y", etc y todos comienzan con "nombre"
FILE_BASE_NAME = 'HEB'

## Código

### Entorno

In [2]:
# Control de datos
import requests
from PIL import Image
from io import BytesIO
from time import sleep
from pathlib import Path
from cloudscraper import create_scraper
from IPython.display import clear_output

# Ingeniería de variables
from numpy import array
from re import search as re_search
from pandas import DataFrame, read_csv

### Clases

In [3]:
class ImageFromUrl:
    '''
    Obtiene las imágenes indicadas en cierta columna de un archivo csv y si son PNG las convierte a JPEG para exportar todas en el mismo formato
    '''
    def __init__(self, base_dir: str, file_base_name: str, anti_bot: bool=True) -> None:
        # Convierte el texto a objeto tipo Path para unir directorios, buscar archivos, etc
        self.base_dir = Path(base_dir)
        # Asigna el nombre base como atributo 
        self.file_name = file_base_name
        # Encuentra todos los archivos que comiencen con el nombre base en el directorio
        self.files_list = [x for x in self.base_dir.glob('*') if re_search(f'{self.file_name}_(?!found).+\.csv', str(x))]
        # Se asigna el objeto para hacer requests
        if anti_bot: self.scraper_obj = create_scraper(browser='firefox')
        else: self.scraper_obj = requests

    def __len__(self) -> int:
        '''
        Devuelve la cantidad de archivos encontrados
        '''
        return len(self.files_list)
    
    def __str__(self) -> str:
        return f'Directorio: {self.base_dir}\nCon {self.__len__()} archivo(s)'

    def cool_print(self, text: str, sleep_time: float=0.01, by_word: bool=False) -> None: 
        '''
        Imprimir como si se fuera escribiendo
        '''
        acum = ''
        for x in (text.split() if by_word else text): 
            # Acumular texto
            acum += x+' ' if by_word else x
            # Limpiar pantalla
            clear_output(wait=True)
            # Esperar un poco para emular efecto de escritura
            sleep(sleep_time*(9 if by_word else 1))
            # Imprimir texto acumulado
            print(acum)
        # Mantener el texto en pantalla
        sleep(1.7)

    def user_exit(self) -> bool:
        # Esperar respuesta del usuario para continuar
        user_response = ''
        while user_response not in ['y','n','Y','N']:
            user_response = input('Presiona "y" para continuar o "n" para salir\n')
        else: 
            return user_response in ('n','N')

    def read_files(self) -> DataFrame:
        '''
        Une todos los archivos que comienzan con el nombre base
        '''
        # Tabla vacía para ir depositando los csv
        df = DataFrame()
        # Obtiene el número de archivos, sólo es informativo
        total_files = len(self.files_list)

        for i,file_chunk in enumerate(self.files_list):
            # Obtener sólo el nombre del archivo, no su ubicación completa
            sub_name = str(file_chunk).split('/')[-1]
            # Une la tabla anterior con el nuevo archivo
            df = df.append(read_csv(file_chunk), ignore_index=True)
            
            # Informa al usuario del avance
            print(f'Archivo {i+1}/{total_files} con nombre: {sub_name} es importado exitosamente')
        return df
    
    def export_csv(self, df: DataFrame, name_suffix=None, **kwargs) -> None: 
        '''
        Exportar un archivo en formato csv
        '''
        # Si no hay sufijo, guardar el archivo según el atributo "self.file_name" de otro modo agregar el sufijo
        export_name = f'{self.file_name}.csv' if name_suffix==None else f'{self.file_name}_{name_suffix}.csv'
        # Exportar en el directorio base, con los argumentos del método "to_csv()" que se indiquen. Ej: sep='\t', encoding='utf-16'
        df.to_csv(self.base_dir.joinpath(export_name), **kwargs)
        # Informa al usuario
        print(f'Archivo: {export_name} fue exportado exitosamente en:\n{self.base_dir}')

    def check_url(self, url: str, n_try: int=10):
        if not isinstance(url, str): return 'URL inválido'
        url_error = True
        i = 0
        while url_error and i < n_try:
            try: 
                # Obtener la respuesta del servidor, evitando el posible anti-bot
                response = self.scraper_obj.get(url)
                url_error = False
            # Si no es posible conectar con la URL, intentarlo de nuevo hasta "n_try" veces
            except:
                sleep(2)
                print(f'Error en el intento #{i+1} de obtener la imagen desde:')
                print(url)
                i+=1
        if url_error and i > n_try: return 'URL inválido'
        else: return response

    def get_image(self, response) -> Image:
        '''
        Obtiene la imagen desde url o el estatus del request si es que falla
        '''
        # Convierte la información de bytes a un objeto Imagen para poderlo trabajar con la librería PIL
        try:
            img = Image.open(BytesIO(response.content))
            return img
        except: return str(response.status_code) # En caso de no lograrlo, devuelve el estatus del request

    def to_jpeg(self, png_img: Image, rgb_background: tuple=(255, 255, 255)) -> Image:
        '''
        Convierte una imagen RGBA --> RGB, es decir, le quita la transparencia
        '''
        # Asegura que tenga los cuatro canales de color
        png_img = png_img.convert('RGBA')
        png_img.load()

        # Crea un "lienzo" RGB del mismo tamaño que la imagen RGBA
        jpg_img = Image.new("RGB", png_img.size, rgb_background)
        # Pega la imagen en el lienzo RBG la imagen RBGA
        jpg_img.paste(png_img, mask=png_img.split()[3])

        return jpg_img

    def get_convert(self, url: str, file_name: str, to_path: str, save_just_png: bool=True) -> None:
        '''
        Obtiene la imagen y revisa si necesita convertirse a JPEG
        '''
        # Llama el método para importar la imagen via URL
        resp = self.check_url(str(url).replace(' ',''))
        if resp == 'URL inválido': return resp, None

        img = self.get_image(resp)
        # Si no es posible obtener la imagen, devolver el estatus del request y terminar el método
        if isinstance(img, str): return None, img
        # Definir el directorio en el que se va a guardar la imagen
        file_path = str(to_path.joinpath(file_name+'.jpeg'))
        # Convierte la imagen a una matriz y revisa si tiene 4 canales RGBA, es decir, si tiene transparencia
        if array(img).shape[-1] == 4:
            # De ser así, llama al método para convertirla a RGB
            img = self.to_jpeg(img)        
            # Guarda un objeto para informar al usuario que se ha convertido desde PNG
            img_type = 'PNG'
        # De otro modo, informará que desde origen es JPEG
        else: img_type = 'JPEG'

        # Guarda la imagen RBG en formato JPEG
        if save_just_png:
            if img_type=='PNG':
                img.save(file_path, "JPEG")
                print(f'Archivo: {file_name} previamente PNG ahora es exportado en formato JPEG en\n{self.base_dir}')
                sleep(0.3)
                clear_output()
                return img_type, file_path
        else:
            img.save(file_path, "JPEG")
            print(f'Archivo: {file_name} {"previamente PNG ahora es exportado en formato JPEG" if img_type=="PNG" else "que de origen es JPEG es exportado"} en\n{self.base_dir}')
            sleep(0.3)
            clear_output()

        return img_type, file_path

    def full_pipeline(self, just_png: bool, name_cols: list, url_col='img_url') -> DataFrame:
        dest_path = self.base_dir.joinpath(self.file_name+'_images')
        try: dest_path.mkdir()
        except FileExistsError:
            print(f'Ya existe un folder llamado "{self.file_name}_images" en\n{self.base_dir}\n\n¿Quieres sobreescribirlo?')
            if self.user_exit():
                print('Proceso terminado. Buen día!')
                return None
            dest_path.mkdir(exist_ok=True)
        df = self.read_files()
        df['img_name'] = df[name_cols].fillna('').astype(str).apply(lambda row: '__'.join([f'{col.upper()}_{x.replace("/","").replace(".","")}' for col,x in zip(name_cols,row)]), axis=1)
        
        df['result'] = df[[url_col,'img_name']].apply(lambda x: self.get_convert(x[0], file_name=x[-1], to_path=dest_path, save_just_png=just_png), axis=1)
        
        df[['img_type', 'img_path']] = DataFrame(df['result'].tolist(), index=df.index)

        omit_names = [None,'JPEG'] if just_png else [None]
        for col in ['img_name','img_path']: df[col] = df[['img_type',col]].apply(lambda x: None if x[0] in omit_names else x[-1], axis=1)
        df.drop('result', axis=1, inplace=True)

        try: dest_path.rmdir()
        except: pass
        
        return df

In [4]:
class ImageFromFolder(ImageFromUrl):
    def __init__(self, base_dir: str,folder_name: str) -> None:
        self.base_dir = Path(base_dir)
        self.folder_name = folder_name
        self.folder_path = self.base_dir.joinpath(self.folder_name)
        self.image_list = self.folder_path.glob('*.png')

    def convert_folder(self) -> None:
        self.jpeg_folder = self.base_dir.joinpath(f'{self.folder_name} - JPEG')
        self.jpeg_folder.mkdir(exist_ok=True)
        for img_file in self.image_list:
            img_path = ''.join(str(img_file).split('.')[:-1])
            img_name = img_path.split('/')[-1]
            img_ext = str(img_file).split('.')[-1]
            img = Image.open(img_file)
            if img_ext == 'png': img = self.to_jpeg(img)
            img.save(self.jpeg_folder.joinpath(f'{img_name}.jpeg'), "JPEG")

In [5]:
broken_url = 'http://www.superama.com.mx/superama/images/Products/img_large/0750103392317L.jpg'
non_broken = 'https://www.lacomer.com.mx/superc/img_art/7501033923173_3.jpg'

In [6]:
a = ImageFromUrl('','',False)
print(a.check_url(broken_url).reason)
print(a.check_url(non_broken).reason)

Forbidden
OK


In [7]:
df = read_csv('/Users/efraflores/Desktop/borrar.csv')
df.sample()

Unnamed: 0,catalog_product_id,same_url,img_url,large_img_url
12598,369577,False,https://www.superama.com.mx/Content/images/Pro...,https://www.superama.com.mx/Content/images/pro...


In [9]:
df = df.sample(frac=0.01)
df.shape

(265, 4)

In [10]:
df['url_status'] = df['img_url'].map(lambda x: a.check_url(x).reason)
df['url_status'].value_counts(1, dropna=False)

Forbidden    1.0
Name: url_status, dtype: float64

## Resultado

In [None]:
iff = ImageFromFolder(base_dir='/Users/efraflores/Desktop', folder_name='Revisión Corner FTP')
iff.convert_folder()

In [None]:
df = ImageFromUrl(BASE_DIR, FILE_BASE_NAME, anti_bot=False).full_pipeline(just_png=False, name_cols=['supply_product_id'])
df.sample()