In [1]:
#!pip install lxml

import requests
from bs4 import BeautifulSoup as bts
import pandas as pd
import re
#import time
import datetime
import socket

TIMEOUT_SEC = 10 # default timeout in seconds
socket.setdefaulttimeout(TIMEOUT_SEC)

def getAndParseURL(url):
    result = requests.get(url,headers={"User-Agent":"Mozilla/5.0"})
    soup = bts(result.text, 'html.parser')
    return soup

html = getAndParseURL("http://vitibrasil.cnpuv.embrapa.br/index.php?opcao=opt_01")
html_footer = html.find_all("table", {"class": "tb_base tb_footer"})
bs_text_tag = html_footer[0].select('td')[0]
modification_date_regex = r"Última modificação: (\d{2})/(\d{2})/(\d{2})"
match = re.search(modification_date_regex, bs_text_tag.get_text())

if match:
    day, month, year = match.groups()
    modified_date = datetime.datetime(year=int(year)+2000, month=int(month), day=int(day))

    print("Última modificação:", modified_date.strftime("%d-%m-%Y"))
else:
    print("Não foi possível encontrar a data da 'Última modificação' do site")

Última modificação: 21-12-2023


In [6]:
modified_date.date()

datetime.date(2023, 12, 21)

In [7]:
ultima_data_coletada = datetime.date(2023,12,21)

if modified_date.date() > ultima_data_coletada:
    print('Deve ser coletado o dado novamente')
else:
    print('A base já está atualizada, utilize o CSV')

A base já está atualizada, utilize o CSV


In [1]:
import requests
from bs4 import BeautifulSoup as bts
import pandas as pd
import re
import datetime

class Vitivinicultura():
    def __init__(self):
        self.url_date = 'http://vitibrasil.cnpuv.embrapa.br/index.php?opcao=opt_01'
        self.base_url = 'http://vitibrasil.cnpuv.embrapa.br/download/'
        self.path_files = 'data_vitivinicultura/csv/'

        self.files = {'file_dot_comma': ['Producao.csv',
                                        'ProcessaViniferas.csv',
                                        'Comercio.csv',
                                        'ImpVinhos.csv',
                                        'ImpEspumantes.csv',
                                        'ImpFrescas.csv',
                                        'ImpPassas.csv',
                                        'ImpSuco.csv',
                                        'ExpVinho.csv',
                                        'ExpEspumantes.csv',
                                        'ExpUva.csv',
                                        'ExpSuco.csv'],
                                        'file_t': ['ProcessaAmericanas.csv',
                                        'ProcessaMesa.csv'] }

        self.file_sep = {
                        'file_dot_comma': ';',
                        'file_t': '\t'
                    }

        self.dataframes = []

    def getAndParseURL(self, url):
        result = requests.get(url,headers={"User-Agent":"Mozilla/5.0"})
        soup = bts(result.text, 'html.parser')
        return soup
    
    def get_last_updated_date(self,):

        html = self.getAndParseURL(self.url_date)
        html_footer = html.find_all("table", {"class": "tb_base tb_footer"})
        bs_text_tag = html_footer[0].select('td')[0]
        modification_date_regex = r"Última modificação: (\d{2})/(\d{2})/(\d{2})"
        match = re.search(modification_date_regex, bs_text_tag.get_text())

        if match:
            day, month, year = match.groups()
            modified_date = datetime.datetime(year=int(year)+2000, month=int(month), day=int(day))

            print("Última modificação:", modified_date.strftime("%d-%m-%Y"))
            return modified_date.date()
        else:
            print("Não foi possível encontrar a data da 'Última modificação' do site")
            return datetime.date(2023,12,21)

    def load_data(self, path):
        for file_type, filenames in self.files.items():
            for filename in filenames:
                try:
                    df = pd.read_csv(path + filename, sep=self.file_sep[file_type])
                    self.dataframes.append(df)
                except Exception as e:
                    df = pd.DataFrame()  
                    self.dataframes.append(df)
                    print(f"Error reading {filename}: {e}")
        return self.dataframes
    
    def get_df_dict(self,modified_date, ultima_data_coletada):
        if modified_date > ultima_data_coletada:
            print('Coletando dados via scraping...')
            dataframes = self.load_data(self.base_url)
        else:
            print('Lendo dados do localmente...')
            dataframes = self.load_data(self.path_files)
        self.df_dict = {
            'Producao': dataframes[0],
            'ProcessaViniferas': dataframes[1],
            'ProcessaAmericanas': dataframes[2],
            'ProcessaMesa': dataframes[3],
            'Comercio': dataframes[4],
            'ImpVinhos': dataframes[5],
            'ImpEspumantes': dataframes[6],
            'ImpFrescas': dataframes[7],
            'ImpPassas': dataframes[8],
            'ImpSuco': dataframes[9],
            'ExpVinho': dataframes[10],
            'ExpEspumantes': dataframes[11],
            'ExpUva': dataframes[12],
            'ExpSuco': dataframes[13]
        }
        #return self.df_dict   
    
    def get_tabela_especifica(self,df_key: str):
        """
        Function to retrieve data based on the key
        """
        if df_key not in self.df_dict:
            print(f"Error: Dataframe '{df_key}' not found")
            return None
        else:
            return self.df_dict[df_key]

In [2]:
vt = Vitivinicultura()

modified_date = vt.get_last_updated_date()
ultima_data_coletada = datetime.date(2023,12,21)

vt.get_df_dict(modified_date, ultima_data_coletada)

In [19]:
df_dict = vt.df_dict
df_key = 'Producaozona'
if df_key not in df_dict:
    print('não tem no dict')
else: 
    print('tem no dict')

não tem no dict


In [None]:
df_producao = vt.get_tabela_especifica('Producao')
df_producao.to_dict(orient='records')

In [15]:
ano_escolhido = '1970'

df_producao[['produto', f'{ano_escolhido}']].to_dict(orient='records')

[{'produto': 'VINHO DE MESA', '1970': 217208604},
 {'produto': 'Tinto', '1970': 174224052},
 {'produto': 'Branco', '1970': 748400},
 {'produto': 'Rosado', '1970': 42236152},
 {'produto': 'VINHO FINO DE MESA (VINIFERA)', '1970': 23899346},
 {'produto': 'Tinto', '1970': 7591557},
 {'produto': 'Branco', '1970': 15562889},
 {'produto': 'Rosado', '1970': 744900},
 {'produto': 'SUCO', '1970': 1097771},
 {'produto': 'Suco de uva integral', '1970': 1097771},
 {'produto': 'Suco de uva concentrado', '1970': 0},
 {'produto': 'Suco de uva adoçado', '1970': 0},
 {'produto': 'Suco de uva orgânico', '1970': 0},
 {'produto': 'Suco de uva reconstituído', '1970': 0},
 {'produto': 'DERIVADOS', '1970': 14164329},
 {'produto': 'Espumante', '1970': 0},
 {'produto': 'Espumante moscatel', '1970': 0},
 {'produto': 'Base espumante', '1970': 0},
 {'produto': 'Base espumante moscatel', '1970': 0},
 {'produto': 'Base Champenoise champanha', '1970': 0},
 {'produto': 'Base Charmat champanha', '1970': 0},
 {'produto'

In [None]:
@app.get("/data/{year}")
async def get_data_by_year(year: str = None, df_key: str = 'Producao'):
    try:
        if df_key not in df_dict:
            print(f"Error: Dataframe '{df_key}' not found")
            response = {}
        else:
            df = vt.get_tabela_especifica('Producao')
            if year:
                try:
                    response = df[['produto', f'{year}']].to_dict(orient='records')
                return response
            else:    
                return df.to_dict(orient='records')
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None  # Return None on unexpected errors
    
            if year:
            try:
                response = df_dict[df_key][year]  # Use try-except for potential KeyError
            except KeyError:
                print(f"Error: Year '{year}' not found in '{df_key}' data")
                response = []  # Return empty list instead of error
        return response