# Preparando el proceso de ingesta de datos.

En este notebook vamos a realizar el proceso de ingesta de datos nuevos para la base de datos y poder así poder tener información continua para realizar un modelado de datos.

En este punto vamos a proceder a trabajar con código que hemos ido desarrollando en los anteriores notebooks.

    - Obtención de datos de la página web, pero aplicado a un determinado tiempo, que será periódico.
    - Ingeniería básica para preparar los .csv necesarios  para la carga de datos en la BBDD.
    - Carga de datos nuevos en la BBDD.

Vamos a intentar reducir el codigo aplicado en relación con los notebooks anteriores. Para ello vamos a recurrir a funciones que generaremos a partir de los codigos anteriores y que estarán en el fichero de [**functions.py**](..\Utils\functions.py)

Es por ello que aquí se verá solo el código sencillo de llamamiento a funciones donde generará primero unos ficheros .csv para después cargar los mismos en la base de datos.

## Descargando los datos para su ingesta

Aquí vamos a desarrollar las funciones necesarias para obtener la información de la página web. Así como las modificaciones que consideramos oportunas para adecuar la información a las tablas creadas en nuestra BBDD.

### Primeramente vamos a convertir en funciones lo que tenemos ahora.

In [1]:
import os
os.chdir(os.path.split(os.getcwd())[0])

In [2]:
from bs4 import BeautifulSoup as bs
import requests
import pandas as pd
import numpy as np
import os
import pickle
import re
from datetime import datetime
import spacy
from nltk.stem.snowball import SnowballStemmer
from Utils import functions as f

In [3]:
'''Variables'''

url_principal="https://www.amantis.net/"                        
url_secundaria=url_principal+'productos-amantis/'


date=str(datetime.today().strftime('%y%m%d'))
folder_ingest=r'.\Data'

ext=r'.csv'
scrape='_scrape'
file_product=r'\productos'
file_user=r'\usuarios'
file_comment=r'\comentarios'
file_price=r'\precios'
file_tag=r'\tags'
file_ingest_product=file_product+scrape
file_ingest_comment=file_comment+scrape

'''Mapeo de meses'''
dm_mapping={
    'enero':1, 
    'febrero':2, 
    'marzo':3, 
    'abril':4, 
    'mayo':5,
    'junio':6, 
    'julio':7,
    'agosto':8, 
    'septiembre':9, 
    'octubre':10, 
    'noviembre':11, 
    'diciembre':12,
} 

nombre_listas=['amenities','anal','BDSM','femenino','masculino','juguetes','lenceria','muebles']

product_ingest=folder_ingest+file_ingest_product+'_'+date+ext
comment_ingest=folder_ingest+file_ingest_comment+'_'+date+ext
product_engineer=folder_ingest+file_product+'_'+date+ext
tag_engineer=folder_ingest+file_tag+'_'+date+ext
price_engineer=folder_ingest+file_price+'_'+date+ext
comment_engineer=folder_ingest+file_comment+'_'+date+ext
user_engineer=folder_ingest+file_user+'_'+date+ext


In [4]:
def spider_amantis(url=url_secundaria,product_ingest=product_ingest,comment_ingest=comment_ingest):
    """
    Obtiene las URLs de los productos para luego extraer la información de cada producto.

    Input:
    - url (str): URL base para la extracción de productos.

    Return:
    - noduplicated_product (pd.DataFrame): DataFrame de productos sin duplicados.
    - noduplicated_comments (pd.DataFrame): DataFrame de comentarios sin duplicados.
    """
    
    pages= np.arange(1,25)  
    '''Listas a generar con la información de los productos'''
    lista_URLs = []
    name=[]
    regular_prices=[]
    new_price=[]
    info=[]
    id=[]
    comentarios=[]
    '''Generamos 2 diccionarios con los datos importantes para ingresar en una BBDD'''

    diccionario_datos_productos={"ID":id,"NAME":name,"INFO":info,"LISTA_URL":lista_URLs,"REGULAR_PRICE":regular_prices,"DISCOUNT_PRICE":new_price}

    diccionario_comentarios_productos={"ID":id,"COMENTARIOS":comentarios}

    '''Listas para generar la información de los comentarios'''
    id_comment=[]
    comments=[]
    date=[]
    ratio=[]
    users=[]
    comment=[]

    print("Empezando a recoger datos de las paginas")
    for page in pages:
        
        if page == 1:
            URL = url
            response = requests.get(url)
            soup = bs(response.text, 'lxml')
            productos = soup.find_all(class_='caption')
            for producto in productos[9:]:
                URL_producto = producto.find('a')['href']
                lista_URLs.append(URL_producto)
            
        else:
            URL = url+'page' + str(page)+'/'
            response = requests.get(URL)
            soup = bs(response.text, 'lxml')
            productos = soup.find_all(class_='caption')
            for producto in productos[9:]:
                URL_producto = producto.find('a')['href']
                lista_URLs.append(URL_producto)
    print("Terminando de recoger los datos de los links de las paginas\nEmpezando a generar las listas de los productos")

    for i in range(len(lista_URLs)):
        id.append(i)

        
    '''Extraemos la información de cada producto existente'''

    for URL in lista_URLs:
        url_product=URL
        response_product = requests.get(url_product)
        soup_product = bs(response_product.text, 'lxml')
        user_comments_product=[]
        date_comments_product=[]
        comments_product=[]
        rating=[]

        titulos=soup_product.find_all("h1",class_="h3")
        for titulo in titulos:
            nombre=titulo.get_text(strip=True)
            name.append(nombre)

        all_price = soup_product.find_all("div", class_="productoPrecio pull-right tdd_precio")                        
        for price_container in all_price:                                                                    
            try:
                special_price = price_container.find("span", class_="productSpecialPrice")
                if special_price:
                    item_price = float(special_price.get_text(strip=True).replace(",", ".").split('€')[0])
                    new_price.append(item_price)
                    regular_price = price_container.find("del").get_text(strip=True)
                    item_regular_price = float(regular_price.replace(",", ".").split('€')[0])
                    regular_prices.append(item_regular_price)
                else:
                    regular_price = price_container.find("span").get_text(strip=True)
                    item_regular_price = float(regular_price.replace(",", ".").split('€')[0])
                    new_price.append(item_regular_price)
                    regular_prices.append(None)
            except:
                new_price.append(None)
                regular_prices.append(None)

        description=soup_product.find("div", class_="description") 
        information=description.get_text().split('\n')[1:]
        documentation = ''.join(information)
        info.append(documentation)


        '''Vamos a obtener los datos de los comentarios de los usuarios'''
        # print("Cargando los datos de los comentarios")

        all_user_comments = soup_product.find_all("span", class_="name-user") 
        for user_comment in all_user_comments:
            user_comments_product.append(user_comment.get_text(strip=True))

        
        all_dates = soup_product.find_all("span", class_="date")  
        for dates in all_dates:
            dates_text=dates.get_text(strip=True)
            # dates=datetime.strftime(dates, '%dd/%mm/%Y')
            date_comments_product.append(dates_text)
            # date_object = datetime.strptime(date_comments_product)

        all_comments = soup_product.find_all("p")
        for formats in all_comments[-len(date_comments_product):]:
            comments_product.append(formats.get_text(strip=True))

        hearts = soup_product.find_all('div', class_= 'box-description')
        for heart in hearts:
            heart_rating = heart.find_all('span', class_= 'fas fa-heart')
            num_hearts = len(heart_rating)
            rating.append(num_hearts)

        datos = list(zip(date_comments_product,rating, user_comments_product,comments_product ))
        comentarios.append(datos)

    for i, regular_price in enumerate(regular_prices):
        if regular_price is None:
            regular_prices[i] = new_price[i]

    print("Realizando la ingenieria de los datos\nEliminando duplicados de productos")
    productos=pd.DataFrame(diccionario_datos_productos)
    noduplicated_product = productos.drop_duplicates(subset='NAME', keep='first')
    removed_id = productos[productos.duplicated(subset='NAME', keep='first')]['ID']

    print("Tratando los comentarios")

    comentarios_productos=pd.DataFrame(diccionario_comentarios_productos)
    comentarios=pd.DataFrame()
    diccionario={"id":id_comment,"comments":comments}

    for id_product,n_comments in enumerate (comentarios_productos['COMENTARIOS']):
        for i in n_comments:
            id_comment.append(id_product)
            comments.append(i)


    for j in range(len(diccionario['comments'])):
        date.append(diccionario['comments'][j][0])
        ratio.append(diccionario['comments'][j][1])
        users.append(diccionario['comments'][j][2])
        comment.append(diccionario['comments'][j][3])


    comentarios['ID']=pd.Series(id_comment)
    comentarios['DATE']=pd.Series(date)
    comentarios['RATIO']=pd.Series(ratio)
    comentarios['USERS']=pd.Series(users)
    comentarios['COMMENT']=pd.Series(comment)
    
    noduplicated_comments = comentarios[~comentarios['ID'].isin(productos[productos['ID'].isin(removed_id)]['ID'])]
    
    h=input("Quieres salvar los datos?").upper()
    if h=="SI":
        noduplicated_product.to_csv(product_ingest,header=True,index=False)           # Tengo que generar el path correcto
        noduplicated_comments.to_csv(comment_ingest,header=True,index=True)           # Tengo que generar el path correcto
    
    return noduplicated_product,noduplicated_comments



In [5]:
'''Generando nuevos dataframes'''
def product_engineer(df_product,product_engineer=product_engineer):
    """
    Realiza ingeniería de datos para la creación de la tabla de productos.

    Input:
    - df_product (pd.DataFrame): DataFrame de productos.

    Return:
    - df_product (pd.DataFrame): DataFrame de productos con ingeniería aplicada.
    """
    
    print("Generando el fichero de productos")
    
    # df_product=pd.read_csv(folder_ingest+file_ingest_product+'_'+date+ext)
    df_product['NAME'] = df_product['NAME'].str.replace(r'-(?=\w)', '_')
    df_product[['PRODUCT', 'SLOGAN']] = df_product['NAME'].str.split('[,-.]', 1, expand=True)
    df_product['PRODUCT'] = df_product['PRODUCT'].str.strip()
    df_product['SLOGAN'] = df_product['SLOGAN'].str.strip()
    df_product['CHARACTERISTICS'] = df_product['INFO'].str.split('Ver características y medidas|Características', 1).str[1]
    df_product['DESCRIPTION'] = df_product['INFO'].str.split('Ver características y medidas|Características', 1).str[0].str.strip()
    df_product['CHARACTERISTICS'] = df_product['CHARACTERISTICS'].str.replace('\r', ' ')
    df_product['DESCRIPTION'] = df_product['DESCRIPTION'].str.replace('\r', ' ')

    col_1 = df_product.pop('PRODUCT')
    col_2=df_product.pop('SLOGAN')
    col_3=df_product.pop('DESCRIPTION')
    col_4=df_product.pop('CHARACTERISTICS')

    df_product.drop(columns=['NAME'],inplace=True)
    df_product.drop(columns=['INFO'],inplace=True)

    df_product.insert(loc= 1 , column= 'PRODUCT', value= col_1)
    df_product.insert(loc= 2 , column= 'SLOGAN', value= col_2)
    df_product.insert(loc= 3 , column= 'DESCRIPTION', value= col_3)
    df_product.insert(loc= 4 , column= 'CHARACTERISTICS', value= col_4)
    df_product=df_product.iloc[:,:6]
    # df_product.to_csv(folder_ingest+file_product+'_'+date+ext,header=True,index=False)
    h=input("Quieres salvar los datos?").upper()
    if h=="SI":
        df_product.to_csv(product_engineer,header=True,index=False)           # Tengo que generar el path correcto
        
    return df_product

def tag_engineer(df_tag,nombre_listas=nombre_listas,tag_engineer=tag_engineer):
    """
    Realiza ingeniería de datos para la creación de la tabla de tags.

    Input:
    - df_tag (pd.DataFrame): DataFrame de productos.
    - nombre_lista (Lista): Lista de nombres utilizada para cargar listas desde pickles.

    Return:
    - df_tag (pd.DataFrame): DataFrame de tags de los diversos productos.
    """
    from Utils.functions import eliminacion_acentos,cargar_listas_desde_pickles,aplicar_funcion_a_columna
    print("Generando el fichero de tags")
    # df_tag=pd.read_csv(folder_ingest+file_product+'_'+date+ext)
    df_tag['SLOGAN'] = df_tag['SLOGAN'].str.lower()
    df_tag['DESCRIPTION'] = df_tag['DESCRIPTION'].str.lower()
    # df_tag['SLOGAN'] = df_tag['SLOGAN'].apply(eliminacion_acentos)              # Esto da error, debe de ser porque hay NaN en el campo
    df_tag['DESCRIPTION'] = df_tag['DESCRIPTION'].apply(eliminacion_acentos)
    listas = cargar_listas_desde_pickles(nombre_listas)
    for nombre_lista in listas:
        df_tag = aplicar_funcion_a_columna(df_tag, listas,nombre_lista)
    h=input("Quieres salvar los datos?").upper()
    if h=="SI":
        df_tag.to_csv(tag_engineer,header=True,index=False)           # Tengo que generar el path correcto

    return df_tag

def price_engineer (dataframe,price_engineer=price_engineer):
    """
    Realiza ingeniería de datos para la creación de la tabla de precios.

    Input:
    - dataframe (pd.DataFrame): DataFrame de productos.

    Return:
    - df_prices (pd.DataFrame): DataFrame de precios de los productos.
    """

    print("Generando el fichero de precios")
    # dataframe=pd.read_csv(folder_ingest+file_ingest_product+'_'+date+ext)
    col_1 = dataframe.pop('REGULAR_PRICE')
    col_2=dataframe.pop('DISCOUNT_PRICE')
    col_3=dataframe['ID']
    dataframe.insert(loc= 1 , column= 'ID_PRODUCT', value= col_3)
    dataframe.insert(loc= 2 , column= 'REGULAR_PRICE', value= col_1)
    dataframe.insert(loc= 3 , column= 'DISCOUNT_PRICE', value= col_2)
    date_price=datetime.today().strftime('%y/%m/%d')
    df_prices=dataframe.iloc[:,:4]
    df_prices['FECHA']=date_price
    h=input("Quieres salvar los datos?").upper()
    if h=="SI":
        df_prices.to_csv(price_engineer,header=True,index=False)           # Tengo que generar el path correcto
    return df_prices

def comments_engineer (dataframe,dm_mapping=dm_mapping,comment_engineer=comment_engineer,user_engineer=user_engineer):
    """
    Realiza ingeniería de datos para la creación de la tabla de comentarios.

    Input:
    - dataframe (pd.DataFrame): DataFrame de comentarios.
    - dm_mapping (dict): Mapeo de meses para el tratamiento de fechas.

    Return:
    - df_comments (pd.DataFrame): DataFrame de comentarios con ingeniería aplicada.
    - df_users (pd.DataFrame): DataFrame de usuarios que han realizado comentarios.

    """

    '''Tratando a los Usuarios'''
    print("Generando el fichero de usuarios")

    nombre_count = {}
    count = {}

    for i, row in dataframe.iterrows():
        id = row['ID']
        nombre = row['USERS']
        
        if id not in count:
            count[id] = {}
            
        if nombre in count[id]:
            count[id][nombre] += 1
            nueva_nombre = f"{nombre}_{count[id][nombre]}"
            dataframe.loc[i, 'USERS'] = nueva_nombre
        else:
            count[id][nombre] = 1
    lista_users=dataframe['USERS'].unique()
    mivalor = [ x for x in range(len(lista_users))]             
    lista_users=list(lista_users)                                
    lista_users_code = {k: v for k, v in zip(lista_users, mivalor)}   
    dataframe['ID_USERS']= dataframe['USERS'].map(lista_users_code)
    df_users=pd.DataFrame()
    df_users['ID_USERS']=dataframe['ID_USERS']
    df_users['USERS']=dataframe['USERS']
    df_users.drop_duplicates(subset='ID_USERS', keep='first',inplace=True)



    print("Generando el fichero de comentarios")
    '''Tratando a las Fechas'''

    dataframe['DAY']=dataframe['DATE'].str.split(' ').str.get(1).astype('Int64')
    dataframe['MONTH']=dataframe['DATE'].str.split(' ').str.get(2).str.split(',').str.get(0)
    dataframe['YEAR']=dataframe['DATE'].str.split(' ').str.get(-1).astype('Int64')
    dataframe['MONTH']=dataframe['MONTH'].map(dm_mapping)
    dataframe['DATE'] = pd.to_datetime(dataframe.iloc[:,-3:])
    df_comments=dataframe.iloc[:,:-3]

    col = df_comments.pop('ID_USERS')
    df_comments.drop(columns=['USERS'],inplace=True)
    df_comments.insert(loc= 4 , column= 'ID_USERS', value= col)
    h=input("Quieres salvar los datos?").upper()
    if h=="SI":
        df_users.to_csv(user_engineer,header=True,index=False)           # Tengo que generar el path correcto
        df_comments.to_csv(comment_engineer,header=True,index=False)           # Tengo que generar el path correcto

    return df_comments,df_users





Probando las funciones

In [6]:
noduplicated_product, noduplicated_comments = spider_amantis(url_secundaria)
df_product = product_engineer(noduplicated_product)    
# df_product.head()
# df_tag=tag_engineer(df_product,nombre_listas)
# df_tag.head()


Empezando a recoger datos de las paginas
Terminando de recoger los datos de los links de las paginas
Empezando a generar las listas de los productos


In [None]:
df_product.head()

In [None]:
def tag_engineer(df_tag,nombre_listas=nombre_listas,tag_engineer=tag_engineer):
    """
    Realiza ingeniería de datos para la creación de la tabla de tags.

    Input:
    - df_tag (pd.DataFrame): DataFrame de productos.
    - nombre_lista (Lista): Lista de nombres utilizada para cargar listas desde pickles.

    Return:
    - df_tag (pd.DataFrame): DataFrame de tags de los diversos productos.
    """

    print("Generando el fichero de tags")
    # df_tag=pd.read_csv(folder_ingest+file_product+'_'+date+ext)
    df_tag['SLOGAN'] = df_tag['SLOGAN'].str.lower()
    df_tag['DESCRIPTION'] = df_tag['DESCRIPTION'].str.lower()
    # df_tag['SLOGAN'] = df_tag['SLOGAN'].apply(f.eliminacion_acentos)              # Esto da error, debe de ser porque hay NaN en el campo
    df_tag['DESCRIPTION'] = df_tag['DESCRIPTION'].apply(f.eliminacion_acentos)
    listas = f.cargar_listas_desde_pickles(nombre_listas)
    # for nombre_lista in listas:
    #     df_tag = f.aplicar_funcion_a_columna(df_tag, listas,nombre_lista)
    # h=input("Quieres salvar los datos?").upper()
    # if h=="SI":
    #     df_tag.to_csv(tag_engineer,header=True,index=False)           # Tengo que generar el path correcto

    return df_tag,listas

In [None]:
df_tag,listas=tag_engineer(df_product)
df_tag.head()
# ,nombre_listas=nombre_listas,tag_engineer=tag_engineer)

In [None]:
# noduplicated_product, noduplicated_comments = spider_amantis(url_secundaria)
noduplicated_product.head()

**Hay que tener en cuenta que las funciones que dejamos finalmente en el fichero .py sufren modificaciones sobre estas.**

### Modificaciones oportunas para adaptar la información a cargar en la BBDDD.

En este sentido tenemos que entender que la BBDD ya tiene determinados productos cargados y, por lo tanto, con su ID establecidas.

Es por ello que hay que cambiar las ID de la descarga a las ID existentes.

¿Cómo lo vamos a realizar? Vamos a extraer la información de la BBDD en un Dataframe y compararlos para cambiar las ID de los campos *[PRODUCT.product]* y *[USERS.users]*.

También hay que tener en cuenta que la descarga hemos obtenido toda la información de *COMMENTS*, por lo que hay que ver cual es la fecha más reciente de un comentario en la BBDD y solo dejar en el dataframe de COMMENTS los comentarios no guardados. 




#### 1. Reduciendo los comentarios.

¿Qué librerías necesitaremos? 

    - Las librerías para llamar a la BBDD.
    - Las librerías para el manejo de Dataframes.
    - Las librerías para el manejo de Fechas.

In [1]:
import os
os.chdir(os.path.split(os.getcwd())[0])
folder=os.getcwd()

In [2]:
folder

'd:\\Data_science\\Javier\\Repositorios\\Proyecto_tienda_online\\src'

In [3]:
import pandas as pd
import numpy as np
from datetime import datetime,date
import sqlite3
from Utils import functions as f

In [10]:
df_date=pd.read_csv(r'Data\comentarios_scrape_240301.csv')
df_date.head()

Unnamed: 0.1,Unnamed: 0,ID,DATE,RATIO,USERS,COMMENT
0,0,0,"jueves 29 febrero, 2024",5,Ana Vanessa,"Es una pasada, es pequeñito pero tiene mucha p..."
1,1,0,"sábado 21 octubre, 2023",5,ana maria,muy práctico por su pequeño tamaño. vibración ...
2,2,0,"domingo 01 octubre, 2023",5,Paula,"Es pequeño pero útil, da mucho juego y la pila..."
3,3,0,"martes 12 septiembre, 2023",4,FCO JAVIER,"Magnifica bala, es muy potente y da mucho jueg..."
4,4,0,"martes 22 agosto, 2023",5,Daria,Pequeño y potente. Se puede combinar con otros...


In [11]:
df_date.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3609 entries, 0 to 3608
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   Unnamed: 0  3609 non-null   int64 
 1   ID          3609 non-null   int64 
 2   DATE        3609 non-null   object
 3   RATIO       3609 non-null   int64 
 4   USERS       3607 non-null   object
 5   COMMENT     3609 non-null   object
dtypes: int64(3), object(3)
memory usage: 169.3+ KB


In [5]:
dm_mapping={
    'enero':1, 
    'febrero':2, 
    'marzo':3, 
    'abril':4, 
    'mayo':5,
    'junio':6, 
    'julio':7,
    'agosto':8, 
    'septiembre':9, 
    'octubre':10, 
    'noviembre':11, 
    'diciembre':12,
} 

Vamos a seleccionar el valor máximo de fecha

In [12]:
def reengineer_comment(BBDD, df_comment):
    '''Reducción del número de registros del dataframe commentarios_scrape para su ingesta en la BBDD
    Input:
    - BBDD (str): Base de datos utilizada.
    - df_comment (Dataframe): Dataframe con los comentarios originales, sin filtrar

    Return:
    - df_comment_filtered (Dataframe): Dataframe con los comentarios filtrados y las fechas convertidas en Datetime.
    '''
    print("Iniciando la conversión de fecha y la reducción de datos de comentarios")

    ''' Conectamos con la base de datos, extraemos la fecha más reciente y la cerramos'''
    conn = sqlite3.connect(BBDD)
    cursor = conn.cursor()
    query='''SELECT MAX(DATE) FROM COMMENT'''
    MAX_date=f.sql_query(query,cursor)
    conn.commit()
    cursor.close()
    conn.close()
    fecha_maxima = MAX_date.iloc[0, 0]
    fecha_maxima = date.fromisoformat(fecha_maxima)
    max_date = pd.to_datetime(fecha_maxima, unit='ns')

    '''Cargamos el dataframe y lo preparamos para su filtro'''
    df_comment['DAY']=df_comment['DATE'].str.split(' ').str.get(1).astype('Int64')
    df_comment['MONTH']=df_comment['DATE'].str.split(' ').str.get(2).str.split(',').str.get(0)
    df_comment['YEAR']=df_comment['DATE'].str.split(' ').str.get(-1).astype('Int64')

    df_comment['MONTH']=df_comment['MONTH'].map(dm_mapping)
    df_comment['DATE'] = pd.to_datetime(df_comment.iloc[:,-3:])
    df_comment=df_date.iloc[:,:-3]

    '''Realizamos el filtro'''
    df_comment_filtered=df_comment[df_date['DATE']> max_date]
    print("El número de registros a ingresar es:",len(df_comment_filtered))
    return df_comment_filtered

df_date_filtered=reengineer_comment("Resources/online_shop.db",df_date)




2023-06-06


383

#### 12. Reindexando ***PRODUCTOS***.

¿Qué librerías necesitaremos? 

    - Las librerías para llamar a la BBDD.
    - Las librerías para el manejo de Dataframes.
    - Las librerías para el manejo de Fechas.