In [0]:
!pip install bs4

In [0]:
from datetime import datetime
from bs4 import BeautifulSoup
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import current_date, current_timestamp, expr, from_utc_timestamp, date_format
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    FloatType,
    DecimalType
)
import requests
import pandas as pd
import re
import json
from IPython.display import display, HTML
from decimal import Decimal

In [0]:
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}

schema = StructType([
    StructField('produto', StringType(), True),
    StructField('preco_normal', DecimalType(10,2), True),
    StructField('link', StringType(), True),
    StructField('site', StringType(), True),
])

In [0]:
url = 'https://www.kabum.com.br/hardware/placa-de-video-vga/placa-de-video-nvidia'
response = requests.get(url, headers=headers)
if response.status_code == 200:
    print("OK")
else:
    print(f"Error:{response.status_code}")

In [0]:
soup = BeautifulSoup(response.content, 'html.parser')
print(soup.prettify())

In [0]:
display(HTML(response.text))

In [0]:
#encontrar script com dados json
script_tag = soup.find('script', {'id': '__NEXT_DATA__'})
if script_tag:
    try:
       json_data = json.loads(script_tag.string)

       #converter a string JSON interna em objeto
       nested_data = json.loads(json_data['props']['pageProps']['data'])
       
       #extrair os dados totalItemsCount e totalPagesCount
       total_items = nested_data['catalogServer']['meta']['totalItemsCount']
       total_pages = nested_data['catalogServer']['meta']['totalPagesCount']

       print(f"Total de items: {total_items}")
       print(f"Total de páginas: {total_pages}")
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
    except KeyError as e:
        print(f"Key not found: {e}")
else:
    print("Script tag not found")

In [0]:
dic_produtos = []
for i in range(1, total_pages + 1):
    url_page = f'https://www.kabum.com.br/hardware/placa-de-video-vga/placa-de-video-nvidia?page_number={i}&amp;page_size=10&amp;facet_filters=&amp;sort=most_searched'
    site = requests.get(url_page, headers=headers)
    soup = BeautifulSoup(site.text, 'html.parser')
    script_tag = soup.find('script', {'id': '__NEXT_DATA__'})

    if script_tag:
        json_data = json.loads(script_tag.string)
        nested_data = json.loads(json_data['props']['pageProps']['data'])
        products = nested_data['catalogServer']['data']

        for product in products:
            name = product.get('name', 'Nome não encontrado')
            preco_normal = product.get('price', 'Preço não encontrado')
            link = f'https://www.kabum.com.br/produto/{product.get("code")}/{product.get('friendlyName')}'   
            dic_produtos.append({'produto': name, 'preco_normal': preco_normal, 'link': link, 'site': 'kabum'})

In [0]:

df_pandas = pd.DataFrame(dic_produtos)
df_pandas['preco_normal'] = (
    df_pandas['preco_normal']
    .apply(lambda x: Decimal(f"{x:.2f}") if pd.notnull(x) else Decimal("0.00"))
)
df_kabum = spark.createDataFrame(df_pandas, schema)
df_kabum = (
                df_kabum
                .withColumn("data_coleta", current_date())
                .withColumn("hora_coleta", 
                            date_format(
                                from_utc_timestamp(current_timestamp(), "America/Sao_Paulo"),
                                "HH:mm:ss"
                            ))
            )

(
    df_kabum
    .write
    .mode("append")
    .option("mergeSchema", "true")
    .format("delta")
    .saveAsTable("monitoramento_dados.bronze.dados_scrap")
)