sempre que adicionar ou mudar uma dependencia, usar pip freeze > requirements.txt( usar dentro da venv)

### 1 - Importação Biliotecas

In [1]:
from bs4 import BeautifulSoup
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import requests
import re
import pyarrow.parquet as pq

### 2 - Extração das informações do site

In [2]:
dadosAcumulados = [] # lista para armazenar os dados acumulados

page = 1 # variavel para controlar a paginação
i = 1

while True: 

    url = f'https://books.toscrape.com/catalogue/page-{page}.html'

    # resposta com o conteudo da pagina
    response = requests.get(url) 
    
    # verifica se a resposta é diferente de 200, se for, sai do loop
    if response.status_code != 200: 
        break

    result = response.text 

    soup = BeautifulSoup(result, 'lxml')
    
    # busca diretamenta todas as tags li que estão dentro de uma tag ol com a classe row
    for item in soup.select('ol.row li'): 
        
        # busca tags a que tenham o atributo title
        title_aux = item.find("a", attrs={"title":True}) 
        # extrair o texto que está dentro do atributo title de cada tag a encontrada 
        title = title_aux.get('title') 

        # busca tags p que tenham a classe star-rating
        star_aux = item.find("p", class_="star-rating")
        # extrair o texto que está dentro do atributo class na posicao 1 de cada tag p encontrada(é a informação que tem a avaliação do livro) 
        star = star_aux.get('class')[1]

        # busca tags p que tenham a classe price_color
        price_aux = item.find("p", class_="price_color")
        # extrair o texto que está dentro de cada tag p encontrada 
        price = price_aux.getText()

        # busca tags p que tenham a classe instock availability
        availability_aux = item.find("p", class_="instock availability")
        # extrair o texto que está dentro de cada tag p encontrada
        availability = availability_aux.getText()

        # Passa todas as informações encontradas como um dicionario para a varivavel dadosAcumulados, contruindo uma lista de dicionarios
        dadosAcumulados.append({'title':title, 'rating':star, 'price':price, 'availability':availability})

        i = i + 1
        print("loop for: ",i)

    page = page + 1 

    print("pagina: ",page)


loop for:  2
loop for:  3
loop for:  4
loop for:  5
loop for:  6
loop for:  7
loop for:  8
loop for:  9
loop for:  10
loop for:  11
loop for:  12
loop for:  13
loop for:  14
loop for:  15
loop for:  16
loop for:  17
loop for:  18
loop for:  19
loop for:  20
loop for:  21
pagina:  2
loop for:  22
loop for:  23
loop for:  24
loop for:  25
loop for:  26
loop for:  27
loop for:  28
loop for:  29
loop for:  30
loop for:  31
loop for:  32
loop for:  33
loop for:  34
loop for:  35
loop for:  36
loop for:  37
loop for:  38
loop for:  39
loop for:  40
loop for:  41
pagina:  3
loop for:  42
loop for:  43
loop for:  44
loop for:  45
loop for:  46
loop for:  47
loop for:  48
loop for:  49
loop for:  50
loop for:  51
loop for:  52
loop for:  53
loop for:  54
loop for:  55
loop for:  56
loop for:  57
loop for:  58
loop for:  59
loop for:  60
loop for:  61
pagina:  4
loop for:  62
loop for:  63
loop for:  64
loop for:  65
loop for:  66
loop for:  67
loop for:  68
loop for:  69
loop for:  70
loop for:

In [None]:
display(dadosAcumulados)

### 3 - Criando um DF dos dados brutos e depois armazenando em parquet na pasta BRONZE

In [None]:
# Criar uma DataFrame a partir da lista DadosAcumulados
RawData = pd.DataFrame(dadosAcumulados)
# Criar um arquivo parquet no camiminho especificado, usando o engine pyarrow() e o algoritmo de compressão snappy
RawData.to_parquet("BD/BRONZE/RawData.parquet", engine="pyarrow", compression="snappy")

### 4 - Fazendo a separação das informações em 3 DFs diferentes, colocando os metadados e armazenando na pasta SILVER

In [31]:
# Para cada item dentro da lista dadosAcumulados, crie um novo dicionário com apenas duas informações
starTable = [{"title": item["title"], "stars": item["rating"]} for item in dadosAcumulados]
priceTable = [{"title": item["title"], "price": item["price"]} for item in dadosAcumulados]
availabilityTable = [{"title": item["title"], "disponibility": item["availability"]} for item in dadosAcumulados]

In [None]:
# criar um dataframe com os dados
starDF = pd.DataFrame(starTable)
priceDF = pd.DataFrame(priceTable)
availabilityDF = pd.DataFrame(availabilityTable)

display(starDF)

Unnamed: 0,title,stars
0,A Light in the Attic,Three
1,Tipping the Velvet,One
2,Soumission,One
3,Sharp Objects,Four
4,Sapiens: A Brief History of Humankind,Five
...,...,...
995,Alice in Wonderland (Alice's Adventures in Won...,One
996,"Ajin: Demi-Human, Volume 1 (Ajin: Demi-Human #1)",Four
997,A Spy's Devotion (The Regency Spies of London #1),Five
998,1st to Die (Women's Murder Club #1),One


In [40]:
# Salva os DataFrames em arquivos Parquet na camada BRONZE
starDF.to_parquet("BD/SILVER/starDF.parquet", engine="pyarrow", compression="snappy")
priceDF.to_parquet("BD/SILVER/priceDF.parquet", engine="pyarrow", compression="snappy")
availabilityDF.to_parquet("BD/SILVER/availabilityDF.parquet", engine="pyarrow", compression="snappy")

### 5 - Tratando os DFs criados

In [41]:
# Lê o arquivo parquet
table = pq.read_table('BD/SILVER/starDF.parquet')

# Acessar metadados
metadata = table.schema.metadata
print(metadata)  # Mostra os metadados atuais

# Adicionar/alterar metadados
new_metadata = {b'table': b'star_table',
                b'description': b'table containing the titles and ratings of extracted books', 
                b'columns': b'title: string, stars: string',
                b'created': b'26/05/2025',
                b'data origin': b'https://books.toscrape.com/catalogue/category/books_1/page-1.html',}
table = table.replace_schema_metadata(new_metadata)

# Chama o modulo do pyarrow parquet para sobreescrever o arquivo com os novos metadados
pq.write_table(table, 'BD/SILVER/starDF.parquet')

{b'pandas': b'{"index_columns": [{"kind": "range", "name": null, "start": 0, "stop": 1000, "step": 1}], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode", "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "title", "field_name": "title", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}, {"name": "stars", "field_name": "stars", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}], "creator": {"library": "pyarrow", "version": "20.0.0"}, "pandas_version": "2.2.3"}'}


In [42]:
# Lê o arquivo parquet
table = pq.read_table('BD/SILVER/priceDF.parquet')

# Acessar metadados
metadata = table.schema.metadata
print(metadata)  # Mostra os metadados atuais

# Adicionar/alterar metadados
new_metadata = {b'table': b'price_table',
                b'description': b'table containing the titles and prices of extracted books', 
                b'columns': b'title: string, price: string',
                b'created': b'26/05/2025',
                b'data origin': b'https://books.toscrape.com/catalogue/category/books_1/page-1.html',}
table = table.replace_schema_metadata(new_metadata)

# Chama o modulo do pyarrow parquet para sobreescrever o arquivo com os novos metadados
pq.write_table(table, 'BD/SILVER/priceDF.parquet')

{b'pandas': b'{"index_columns": [{"kind": "range", "name": null, "start": 0, "stop": 1000, "step": 1}], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode", "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "title", "field_name": "title", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}, {"name": "price", "field_name": "price", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}], "creator": {"library": "pyarrow", "version": "20.0.0"}, "pandas_version": "2.2.3"}'}


In [43]:
# Lê o arquivo parquet
table = pq.read_table('BD/SILVER/availabilityDF.parquet')

# Acessar metadados
metadata = table.schema.metadata
print(metadata)  # Mostra os metadados atuais

# Adicionar/alterar metadados
new_metadata = {b'table': b'availability_table',
                b'description': b'table containing the titles and availability of extracted books', 
                b'columns': b'title: string, disponibility: string',
                b'created': b'26/05/2025',
                b'data origin': b'https://books.toscrape.com/catalogue/category/books_1/page-1.html',}
table = table.replace_schema_metadata(new_metadata)

# Chama o modulo do pyarrow parquet para sobreescrever o arquivo com os novos metadados
#pq.write_to_dataset(table, 'BD/SILVER/availabilityDF.parquet', existing_data_behavior= 'overwrite_or_ignore')
pq.write_table(table, 'BD/SILVER/availabilityDF.parquet')

{b'pandas': b'{"index_columns": [{"kind": "range", "name": null, "start": 0, "stop": 1000, "step": 1}], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode", "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "title", "field_name": "title", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}, {"name": "disponibility", "field_name": "disponibility", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}], "creator": {"library": "pyarrow", "version": "20.0.0"}, "pandas_version": "2.2.3"}'}


In [46]:
startDF_v2 = pd.read_parquet("BD/SILVER/starDF.parquet", engine="pyarrow")
priceDF_v2 = pd.read_parquet("BD/SILVER/priceDF.parquet", engine="pyarrow")
availabilityDF_v2 = pd.read_parquet("BD/SILVER/availabilityDF.parquet", engine="pyarrow")

display(startDF_v2)
#display(priceDF_v2)
#display(availabilityDF_v2)

Unnamed: 0,title,stars
0,A Light in the Attic,Three
1,Tipping the Velvet,One
2,Soumission,One
3,Sharp Objects,Four
4,Sapiens: A Brief History of Humankind,Five
...,...,...
995,Alice in Wonderland (Alice's Adventures in Won...,One
996,"Ajin: Demi-Human, Volume 1 (Ajin: Demi-Human #1)",Four
997,A Spy's Devotion (The Regency Spies of London #1),Five
998,1st to Die (Women's Murder Club #1),One


In [52]:
# Function to convert the text in the stars column to a number
from turtle import st

def text_to_number(text):

    number_dictionary = {'Zero': 0, 'One': 1, 'Two': 2, 'Three': 3, 'Four': 4,'Five': 5}

    try:
        if text in number_dictionary:
            return number_dictionary[text]
    except Exception as e:
        print(f"Erro ao converter o texto '{text}': {e}")
        return None
    
for item in startDF_v2:
    startDF_v2['stars'] = text_to_number(startDF_v2['stars'])

display(startDF_v2)

Erro ao converter o texto '0      None
1      None
2      None
3      None
4      None
       ... 
995    None
996    None
997    None
998    None
999    None
Name: stars, Length: 1000, dtype: object': unhashable type: 'Series'
Erro ao converter o texto '0      None
1      None
2      None
3      None
4      None
       ... 
995    None
996    None
997    None
998    None
999    None
Name: stars, Length: 1000, dtype: object': unhashable type: 'Series'


Unnamed: 0,title,stars
0,A Light in the Attic,
1,Tipping the Velvet,
2,Soumission,
3,Sharp Objects,
4,Sapiens: A Brief History of Humankind,
...,...,...
995,Alice in Wonderland (Alice's Adventures in Won...,
996,"Ajin: Demi-Human, Volume 1 (Ajin: Demi-Human #1)",
997,A Spy's Devotion (The Regency Spies of London #1),
998,1st to Die (Women's Murder Club #1),


In [None]:
# Salva os DataFrames em arquivos Parquet na camada SILVER
#starDF.to_parquet("BD/SILVER/starDF.parquet", engine="pyarrow", compression="snappy")
#priceDF.to_parquet("BD/SILVER/priceDF.parquet", engine="pyarrow", compression="snappy")
#availabilityDF.to_parquet("BD/SILVER/availabilityDF.parquet", engine="pyarrow", compression="snappy")

Separação 

# Inicializar a SparkSession

from pyspark.sql.functions import regexp_replace, col, trim
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("DataPipe") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read.parquet("BD/SILVER/availabilityDF.parquet")

# df.show(truncate=False)

# withColumns é sempre usado para selecionar a coluna desejada e após isso é aplicado algum tratamento
df = df.withColumn("disponibility", regexp_replace(col("disponibility"), "\s", ""))
df = df.withColumn("disponibility", F.trim(F.col("disponibility")))

df.show(truncate=False)


spark.stop()



from pyspark.sql.functions import regexp_replace, col, trim
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("DataPipe") \
    .config("spark.hadoop.fs.permissions.umask-mode", "000") \
    .getOrCreate()

# Suponha que você tenha um Spark DataFrame
df = spark.createDataFrame([
    ("Ana", 23),
    ("Bruno", 30)
], ["nome", "idade"])

# Salvar como arquivo Parquet
df.write.parquet("BD/SILVER/availabilityDF.parquet")



from pyspark.sql.functions import regexp_replace, col, trim
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("DataPipe") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read.parquet("BD/SILVER/availabilityDF.parquet")

In [50]:
def soma_positivos(lista):
    total = 0
    for numero in lista:
        if numero > 0:
            total += numero
    return total

# Lista de exemplo
numeros = [3, -1, 5, 0, -7, 2]

# Chamando a função
resultado = soma_positivos(numeros)

print("Resultado da soma:", resultado)


Resultado da soma: 10
