In [2]:
#!pip install pandas polars bs4 requests 

import pandas as pd
import polars as pl
import bs4
import requests
import re
import os



In [None]:
# get all links that contains .csv 
relative_path = './pmqc/'
url = 'https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/pmqc-programa-de-monitoramento-da-qualidade-dos-combustiveis'
response = requests.get(url)
soup = bs4.BeautifulSoup(response.text, 'html.parser')


In [None]:
links = soup.find_all('a', href=re.compile(r'.csv'))
# get only links between href and " target
links = [link['href'] for link in links if 'target' in link.attrs]

# # download all files using multiprocessing and urllib
import urllib.request
from concurrent.futures import ThreadPoolExecutor

def download_file(link):
    # from string https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/pmqc/2023/pmqc_2023_04.csv
    # get year of file 
    year = link.split('/')[-2]
    name_file = link.split('/')[-1]
    if not os.path.exists(f"./pmqc/"):
        os.makedirs(f"./pmqc/")
    try:
        urllib.request.urlretrieve(link, f"./pmqc/{name_file}")
        print(f"Downloaded {name_file}")
    except:
        print(f"Error to download {name_file}")
        pass

with ThreadPoolExecutor(max_workers=5) as executor:
    # Enviar as tarefas para o executor
    futures = [executor.submit(download_file, link) for link in links]

    # Iterar sobre os resultados à medida que são concluídos
    for future in as_completed(futures):
        try:
            result = future.result()
            # Aqui você pode lidar com o resultado, se necessário
        except Exception as e:
            print(f"Exception: {e}")

In [None]:
folder_destination = ''

In [35]:
%%time 

if not os.path.exists(f"pmqc_processed/"):
    os.makedirs(f"pmqc_processed/")
    
df = pl.read_csv("pmqc/*.csv", separator=';', infer_schema_length=10000, ignore_errors=True, truncate_ragged_lines=True)#.limit(1000)
df = df.with_columns(
    pl.col('DataColeta').str.strptime(pl.Date, "%Y-%m-%d", strict=False).cast(pl.Date),
    # remove all special characters from column CnpjMatriz
    pl.col('CnpjPosto').str.replace_all(r'[^0-9]', '')
)
## print count rows
print(df.shape)
print(df.columns)
#postos = postos.unique(subset=["CnpjPosto"])

# # ## generate postos dimensions
postos = df.select([
    'CnpjPosto',
    'RazaoSocialPosto',
    'Distribuidora',
    'DataColeta',
    'Endereço',
    'Latitude',
    'Longitude',
    'Bairro',
    'Município',
    'Uf'])
postos = postos.with_columns(
    pl.col('CnpjPosto').str.slice(0, 8).cast(pl.Int32).alias('CnpjMatriz')
)

postos = postos.sort('DataColeta', descending=True).unique(subset=["CnpjPosto"], keep='first')
# create empty dataframe with date (timestamp), cnpj, observation_txt
df_errors = []

# for each row if latitude or longitude is null, add row to df_errors
for row in postos.rows(named=True):
    if row['Latitude'] is None or row['Longitude'] is None:
        df_errors.append({
            'DataColeta': row['DataColeta'],
            'CnpjPosto': row['CnpjPosto'],
            'Observação': 'Latitude ou Longitude não informados'
        })

df_errors = pd.DataFrame(df_errors)
df_errors = pl.from_pandas(df_errors)
df_errors.shape

# remove all rows with latitude or longitude is null

# # apply function to get new column "geometry" with values like {'type': 'Point', 'coordinates': [-54.61611004, -20.46871167]}} in string format

# def get_geometry(row):
#     if row['Longitude'] is None or row['Latitude'] is None:
#         return {'type': 'Point', 'coordinates': [0, 0]}  # You can use any default values here
#     return {'type': 'Point', 'coordinates': [row['Longitude'], row['Latitude']]}

# postos = postos.with_columns(
#     pl.struct(
#         pl.col('Longitude'),
#         pl.col('Latitude')
#     ).apply(get_geometry).alias('geometry')
# )

# print(postos.shape)

(5198959, 19)
['DataColeta', 'IdNumeric', 'GrupoProduto', 'Produto', 'RazaoSocialPosto', 'CnpjPosto', 'Distribuidora', 'Endereço', 'Complemento', 'Bairro', 'Município', 'Latitude', 'Longitude', 'Uf', 'RegiaoPolitica', 'Ensaio', 'Resultado', 'UnidadeEnsaio', 'Conforme']
CPU times: user 6.44 s, sys: 1.72 s, total: 8.17 s
Wall time: 2.98 s


(19037, 3)

In [None]:
%%time

# generate fact table
coletas = df.select([
    'DataColeta',
    'IdNumeric',
    'CnpjPosto',
    'Produto',
    'Ensaio',
    'Resultado',
    'UnidadeEnsaio',
    'Conforme'])
coletas = coletas.unique(subset=['IdNumeric'])
# get only postos in postos dataframe
coletas = coletas.join(postos, on='CnpjPosto', how='inner')

coletas.write_json('pmqc_processed/coletas.json', row_oriented=True)


In [None]:
## insert postos to MongoDB with column CnpjMatriz as index, and latitute and longitude as 2dsphere

from pymongo import MongoClient
import json

MONGO_PORT = 27017
MONGO_HOST = "localhost"
MONGO_PASS = "example"
MONGO_USER = "root"
MONGO_URL = "mongodb://root:example@localhost:27017"

client = MongoClient(MONGO_URL)
db = client['pmqc']
collection = db['postos']
collection.create_index('CnpjPosto', unique=True)
collection.create_index([('geometry', '2dsphere')])
 

# transform json to latitude and longitude to column geometry as 2dsphere
with open('pmqc_processed/postos.json') as f:
    data = json.load(f)
    collection.insert_many(data)


In [None]:
# insert coletas to MongoDB
collection = db['coletas']
collection.create_index('IdNumeric', unique=True)
with open('pmqc_processed/coletas.json') as f:
    data = json.load(f)
    collection.insert_many(data)

In [None]:
# count rows
collection.count_documents({})
