In [224]:
import requests
from bs4 import BeautifulSoup
from time import sleep
import pandas as pd
import sqlite3
from dataclasses import dataclass, field
from typing import List, Dict


# Modelos

In [216]:
from sqlalchemy import create_engine, Column, Integer, String, Float, URL
from sqlalchemy.orm import sessionmaker, declarative_base

engine = create_engine('sqlite:///top.db', echo=True)

Base = declarative_base()
class Product(Base):
    __tablename__ = 'product'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    price = Column(Float)
    product_url = Column(String)
    product_img = Column(String)
    department = Column(String)

    def to_dict(self):
        pass

# class Department(Base):
#     __tablename__ = 'department'
#     id = Column
Base.metadata.create_all(engine)


2024-03-09 15:10:50,858 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-09 15:10:50,859 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("product")
2024-03-09 15:10:50,859 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-03-09 15:10:50,861 INFO sqlalchemy.engine.Engine COMMIT


In [217]:
@dataclass
class Department:
    name: str
    url: str
    products: List[Product] = field(default_factory=list)
    page_no: int = 0
    items_no: int = 0
    offer: int = 0
    soup: BeautifulSoup = None

    def setSoup(self, response_text):
        self.soup = BeautifulSoup(response_text, 'html.parser')

    def  getPageNumber(self):
        pages = self.soup.find('ul', 'a-pagination')
        self.page_no = int(pages.find_all('li')[-2].text)

    def getPageElements(self):
        # Obtener todos los cards
        cards = self.soup.find_all('div', id='gridItemRoot')

        for card in cards:
            try:
                image = card.find('img')['src']
            except:
                image = ''

            try:
                name = card.find_all('a', 'a-link-normal')[1].find('span').find('div').text
            except:
                name = ''

            try:
                price = card.find('span', '_cDEzb_p13n-sc-price_3mJ9Z')
                price = float(price.text.replace('$', '').replace(',', '')) if price != [] else 0.0
            except Exception as e:
                price = 0.0
            

            element = Product(name=name, 
                              price=price, 
                              product_url='https://www.amazon.com.mx/' + card.find_all('a', 'a-link-normal')[1]['href'], 
                              product_img=image,
                              department = self.name
                    )

            self.products.append(element)

    def getAllElements(self):
        headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}

        self.getPageElements()

        # for i in range(1, self.page_no):

        #     response = requests.get(self.url, headers=headers) if i == 1 else requests.get(self.url+f'={i+1}')
            
        #     while response.status_code != 200:
        #         response = requests.get(self.url, headers=headers) if i == 1 else requests.get(self.url+f'={i+1}')
        #         print(response.status_code, self.name, i+1)
        #         sleep(3)

        #     self.setSoup(response_text=response.text)
        #     self.getPageElements()





        
    

## SQLConnection and modeling


# Main Code

In [218]:
### Petición get
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
url = 'https://www.amazon.com.mx/gp/bestsellers/?ref_=nav_cs_bestsellers'
response = requests.get(url, headers=headers)
while response.status_code != 200:
    response = requests.get(url, headers=headers)
    print(response.status_code)
response.status_code

200


200

In [219]:
### Logica de extración
soup = BeautifulSoup(response.text, 'html.parser')
headers = soup.find_all('div', class_='a-carousel-header-row')

In [220]:
## Obtiene la lista de departamentos dentro de los mas vendidos dentro de amazon
## Se crea el objeto departamento
## Guarda los departamentos en una lista
department_list = []
for header in headers:
    query = header.find('a')
    department_list.append(
        Department(
            str(query['aria-label'].replace(' - Ver más', '')),
            'https://www.amazon.com.mx' + str(query['href'])
        )
    )

## Scraping de cada de partamento

In [221]:
## Itera por cada departamento detectado dentro de la pagina
## hace un request hasta que este devuelva un 200 ya que amazon puede enviarte diferentes status_codes
## Si la petición get no es success no vamos a poder hacer el scraping por lo que se haran las peticiones cada 3 segundos en caso de que no 
## Sea success
for department in department_list:
    print(f'================{department.name}================')
    response = requests.get(url=department.url)
    while response.status_code != 200:
        response = requests.get(url=department.url)
        print(response.status_code, department.name )
        if response.status_code == 200:
            break
        sleep(3)

    ## Despues de que la petición es aceptada nosotros mandamos ese response text
    ## Lo preparamos para la extracción
    ## Nota se tiene que modificar y mejor utilizar una lista de urls ya que cuenta con los diferentes urls y no utiliza la converción  de
    ## URL.com.mx/endpoint=no_pagina
    department.setSoup(response.text)
    department.getPageNumber()
    department.getAllElements()


200 Los más vendidos en Juguetes y Juegos
429 Los más vendidos en Alimentos y Bebidas
200 Los más vendidos en Alimentos y Bebidas
429 Los más vendidos en Belleza
200 Los más vendidos en Belleza
429 Los más vendidos en Tarjetas de Regalo
200 Los más vendidos en Tarjetas de Regalo


## Guardando la data en la DB

In [222]:
from sqlalchemy.orm import Session, sessionmaker
Session = sessionmaker(bind=engine)
session = Session()
engine = create_engine('sqlite:///top.db', echo=True)


for department in department_list:
    session.add_all(department.products)

    session.commit()



2024-03-09 15:11:22,609 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-09 15:11:22,611 INFO sqlalchemy.engine.Engine INSERT INTO product (name, price, product_url, product_img, department) VALUES (?, ?, ?, ?, ?) RETURNING id
2024-03-09 15:11:22,612 INFO sqlalchemy.engine.Engine [generated in 0.00035s (insertmanyvalues) 1/50 (ordered; batch not supported)] ('Apple iPhone 13, 128GB, Medianoche (Reacondicionado)', 0.0, 'https://www.amazon.com.mx//Apple-iPhone-13-Blanco-Estrella/dp/B09LNW3CY2/ref=zg_bs_g_amazon-renewed_d_sccl_1/000-0000000-0000000?psc=1', 'https://images-na.ssl-images-amazon.com/images/I/51nGxi-shlL._AC_UL300_SR300,200_.jpg', 'Los más vendidos en Amazon Renewed : Productos Reacondicionados')
2024-03-09 15:11:22,612 INFO sqlalchemy.engine.Engine INSERT INTO product (name, price, product_url, product_img, department) VALUES (?, ?, ?, ?, ?) RETURNING id
2024-03-09 15:11:22,612 INFO sqlalchemy.engine.Engine [insertmanyvalues 2/50 (ordered; batch not supported)] ('Apple

### Formato de df

In [223]:
records = []
for department in department_list:
    for product in department.products:
        records.append(product.to_dict())
records

AttributeError: 'Product' object has no attribute 'to_dict'

In [None]:
df = pd.DataFrame.from_records(records)
df.drop_duplicates()

In [None]:
import sqlite3

# Conectarse a la base de datos (si no existe, se creará)
conexion = sqlite3.connect('mi_base_de_datos.db')

# Crear un cursor para ejecutar comandos SQL
cursor = conexion.cursor()