<a href="https://colab.research.google.com/github/MikaelSantilio/desafio-speedio/blob/master/DataOps2021.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Desafio DataOps 2021**

### Atividades
*   [x] Ler arquivos em CSVs de ESTABELECIMENTOS
*   [x] Organizar os dados num hash/dicionario
*   [x] Salvar no mongodb
*   [x] Ler os dados do db e obter as seguintes informações:
  *   [x] qual % das empresas estão ativas
  *   [x] Quantas empresas do setor de restaurantes foram abertas em cada ano
  *   [x] quantas empresas num raio de 5km do cep 01422000
  *   [ ] tabela de correlação de CNAE FISCAL PRINCIPAL com os CNAE FISCAL SECUNDÁRIA
*   [x] Exportar os dados do ponto 4 para um CSV e Excel

### Instruções de execução

1.   Colocar uma url do MongoDB e uma chave da API do Google Geocoders nas variáveis de ambiente [nesta célula](#scrollTo=33Irhx1KgBsi&line=1&uniqifier=1); 
2.   Executar células do "Core";
2.   Executar célula "Main";


### Considerações

Devido a utilização de ferramentas apenas com o objetivo de testes algumas informações foram removidas.

1.   Colunas não utilizadas foram removidas;
2.   Os arquivos foram limitados a 2;
3.   Apenas uma amostra de 20% dos dados é enviada ao banco;
4.   O número de CEPs próxímos a localização foi limitado a 10 empresas.


## **Core**

### **Packages**

In [None]:
!pip install pymongo[srv]==3.11.0
!pip install wget
!pip install geopy 

In [None]:
%env GOOGLE_API_KEY = <GOOGLE_API_KEY>
%env MONGO_DB_URL = <MONGO_DB_URL>

In [None]:
import os
import re
from zipfile import ZipFile
import pandas as pd
import requests
from geopy.distance import geodesic
from geopy.geocoders import GoogleV3
from geopy import Point
from bs4 import BeautifulSoup
from sklearn.model_selection import train_test_split
import numpy as np
import wget
import pymongo

### **Utils**

In [55]:
def get_or_create_dir(path: str) -> str:
    if not os.path.exists(path):
        os.mkdir(path)
    return path


# Database
# ------------------------------------------------------------------------------
MONGO_DB_URL = os.getenv("MONGO_DB_URL")

# Google API
# ------------------------------------------------------------------------------
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

# OS Path
# ------------------------------------------------------------------------------
CURRENT_WORK_DIR = os.getcwd()
DOWNLOAD_DIR = get_or_create_dir(os.path.join(os.getcwd(), "zip_files/"))
EXTRACTION_DIR = get_or_create_dir(os.path.join(os.getcwd(), "files/"))

# Columns Dataframe
# ------------------------------------------------------------------------------

COLUMNS = [
    "cnpj_basico",
    "cnpj_ordem",
    "cnpj_dv",
    "cnpj_identificador",
    "nome",
    "sit_cadas",
    "dt_sit_cadas",
    "mot_sit_cadas",
    "nome_cidade_exterior",
    "pais",
    "dt_ini_atv",
    "cnae_1",
    "cnae_2",
    "tipo_logradouro",
    "logradouro",
    "numero",
    "complemento",
    "bairro",
    "cep",
    "uf",
    "municipio",
    "ddd_1",
    "telefone_1",
    "ddd_2",
    "telefone_2",
    "ddd_fax",
    "fax",
    "email",
    "sit_especial",
    "dt_sit_especial"
]
DROP_COLUMNS = {
    "cnpj_ordem",
    "dt_sit_cadas",
    "mot_sit_cadas",
    "nome_cidade_exterior",
    "pais",
    "tipo_logradouro",
    "logradouro",
    "numero",
    "complemento",
    "bairro",
    "uf",
    "municipio",
    "ddd_1",
    "telefone_1",
    "ddd_2",
    "telefone_2",
    "ddd_fax",
    "fax",
    "email",
    "sit_especial",
    "dt_sit_especial"
}

USERCOLS = set(COLUMNS) - DROP_COLUMNS
DROP_COLUMNS_CNPJ = ["cnpj_basico", "cnpj_dv", "cnpj_identificador", "divisao_subsetor"]

def bytes_to(bytes, to, bsize=1024):
    a = {'k' : 1, 'm': 2, 'g' : 3, 't' : 4, 'p' : 5, 'e' : 6 }
    r = float(bytes)
    for i in range(a[to]):
        r = r / bsize
    return(r)

def bar_progress(current, total, width=80):
  current = bytes_to(current, 'm')
  total = bytes_to(total, 'm')
  progress_message = "%s: %.2f%% [%.2f / %.2f] Mb" % ('Download', current / total * 100, current, total)

  os.sys.stdout.write("\r" + progress_message)
  os.sys.stdout.flush()

### **Classe para obter dados**

Suas funcionalidades são o download, extração, limpeza e envio para o MongoDB dos arquivos de Estabelecimentos do site da Receita Federal.

#### **Para iniciar o processo**
```python
ExtractEstabelecimentosReceita().extract_and_upload_files()
```

In [77]:
class ExtractEstabelecimentosReceita:
    page_url: str = "https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj"
    limit_files: int = 2

    def extract_and_upload_files(self) -> None:
        links = self.get_links_estabelecimentos()
        local_downloads = next(os.walk(DOWNLOAD_DIR), (None, None, []))[2]
        local_extractions = next(os.walk(EXTRACTION_DIR), (None, None, []))[2]
        db = EstabelecimentoCollection()

        if self.limit_files:
            links = links[:self.limit_files]

        for link in links:
            file_basename = os.path.basename(link)
            file_name = os.path.splitext(file_basename)[0]
            print(f'\nArquivo {file_basename}')

            if not file_basename in local_downloads:
                wget.download(link, out=DOWNLOAD_DIR, bar=bar_progress)

            if not file_name in local_extractions:
                ZipFile(os.path.join(DOWNLOAD_DIR, file_basename), 'r').extractall(EXTRACTION_DIR)
                print(f"\n{file_name} Extraído")

            dataframe = self.read_file(os.path.join(EXTRACTION_DIR, file_name))
            dataframe = self.clear_dataframe(dataframe)

            db.insert_dataframe(dataframe)
            print(f"\n{file_name} Enviado")

    def get_links_estabelecimentos(self) -> list:
        response = requests.get(self.page_url)
        soup = BeautifulSoup(response.text)

        return [link["href"] for link in soup.find_all("a", {"href": re.compile("ESTABELE")})]

    def read_file(self, url_path: str) -> pd.DataFrame:
        return pd.read_table(
            url_path,
            sep=";",
            names=COLUMNS,
            usecols=USERCOLS,
            dtype={c: str for c in USERCOLS},
            encoding="iso-8859-1"
        )

    def clear_dataframe(self, dataframe: pd.DataFrame) -> pd.DataFrame:
        dataframe = dataframe[dataframe['cep'].notnull()]
        dataframe = dataframe[dataframe.duplicated(subset=['cep'], keep=False)]
        dataframe = dataframe[dataframe.duplicated(
            subset=['cnae_1'], keep=False)]

        dataframe['divisao_subsetor'] = dataframe.apply(lambda row: str(row['cep'])[:5], axis=1)
        dataframe = dataframe[dataframe.duplicated(subset=['cnae_1', 'divisao_subsetor'], keep=False)]

        # Stratified sampling
        X_train, X_test, y_train, y_test = train_test_split(dataframe.drop(['cnae_1', 'divisao_subsetor'], axis=1),
                                                            dataframe[['cnae_1', 'divisao_subsetor']],
                                                            stratify=dataframe[['cnae_1', 'divisao_subsetor']], test_size=0.2)
        del X_train, y_train
        split_frames = [X_test, y_test]
        dataframe = pd.concat(split_frames, axis=1, join='inner')

        dataframe['cnae_2'] = dataframe.apply(lambda row: row['cnae_2'].split(',') if type(row['cnae_2']) == str else np.nan, axis=1)
        dataframe["dt_ini_atv"] = pd.to_datetime(dataframe["dt_ini_atv"], errors = 'coerce')
        dataframe['cnpj'] = dataframe.apply(
            lambda x: self.format_cnpj(
                x['cnpj_basico'], x['cnpj_dv'], x['cnpj_identificador']),
            axis=1
        )
        dataframe.drop(DROP_COLUMNS_CNPJ, axis=1, inplace=True)
        dataframe.reset_index(drop=True, inplace=True)
        dataframe.dropna(subset=['dt_ini_atv'], inplace=True)

        # Reorder columns
        cols = dataframe.columns.tolist()
        cols.remove('cnae_1')
        cols.remove('cnae_2')
        cols.append('cnae_1')
        cols.append('cnae_2')
        dataframe = dataframe[cols]
        dataframe.fillna("-",inplace=True)

        return dataframe

    def format_cnpj(self, basic: str, dv: str, identifier: str) -> str:
        identifier = identifier.zfill(4)
        return '{}.{}.{}/{}-{}'.format(basic[:2], basic[2:5], basic[5:], identifier, dv)


### **Classe de manipulação do MongoDB**

Suas funcionalidades são o envio de DataFrames Pandas para o MongoDB e a exportação dos relatórios.

#### **Para enviar um DataFrame**
```python
EstabelecimentoCollection().insert_dataframe(dataframe: pd.DataFrame)
```

#### **Para exportar os relatórios**
```python
EstabelecimentoCollection().export_reports(export_format: str)
```

In [88]:
class EstabelecimentoCollection:

    db = None
    collection = None

    def __init__(self) -> None:
        client = pymongo.MongoClient(MONGO_DB_URL)
        self.db = client.receita
        self.collection = self.db.estabelecimentos

    def insert_dataframe(self, dataframe: pd.DataFrame) -> None:
        data_dict = dataframe.to_dict("records")
        self.collection.insert_many(data_dict)

    def get_report_active_companies(self) -> list:
        pipeline_percent = [
            {
                "$facet": {
                    "actives": [
                        {"$match": {"sit_cadas": {"$regex": "02"}}},
                        {
                            "$group": {
                                "_id": "total",
                                "count": {
                                    "$sum": 1
                                }
                            }
                        }
                    ],
                    "all": [{
                        "$group": {
                            "_id": "total",
                            "count": {
                                "$sum": 1
                            }
                        }
                    }],

                }
            },
            {
                "$project": {
                    "percent": {
                        "$divide": [
                            {
                                "$first": "$actives.count"
                            },
                            {
                                "$first": "$all.count"
                            }
                        ]
                    }
                }
            },

        ]
        percent = list(self.collection.aggregate(pipeline_percent))[0]["percent"] * 100
        filename = f"Empresas ativas {percent:.1f} por cento"
        return [pd.DataFrame(list(self.collection.find({"sit_cadas": {"$regex": "02"}}, {"_id": 0}))), filename]

    def get_report_restaurants_by_year(self) -> list:
        pipeline = [
            {"$match": {"cnae_1": {"$regex": "^561"}}},
            {"$group": {"_id": {"$year": "$dt_ini_atv"}, "count": {"$sum": 1}}},
            {"$sort": {"_id": -1}},
            {"$project": {
                "ano de abertura": "$_id",
                "quantidade": "$count",
                "_id": False
            }}
        ]
        filename = "Restaurantes abertos por ano"
        return [pd.DataFrame(list(self.collection.aggregate(pipeline))), filename]

    def get_report_nearby_companies(self, cep: str = "01422000", distance_km: int = 5) -> pd.DataFrame:
        pipeline = [
            {"$match": {"cep": {"$regex": f"^{cep[:4]}"}}},
        ]
        dataframe = pd.DataFrame(list(self.collection.aggregate(pipeline)))
        dataframe = dataframe[:10]

        googleGeocoder = GoogleV3(api_key=GOOGLE_API_KEY)
        location = googleGeocoder.geocode(cep, components={"country": "BR"})

        dataframe = dataframe[dataframe.apply(
            lambda row: self.calculate_distance_km(
                location.point, googleGeocoder, row['cep'], distance_km),
            axis=1)]
        
        filename = f"Empresas em um raio de {distance_km}km do CEP {cep}"
        return [dataframe, filename]

    def get_report_cnae_correlation(self) -> pd.DataFrame:
        pass

    def export_reports(self, export_format: str = 'csv') -> None:
        reports = [self.get_report_active_companies, self.get_report_restaurants_by_year, self.get_report_nearby_companies]
        for report in reports:
          if export_format == 'csv':
            data = report()
            data[0].to_csv(os.path.join(CURRENT_WORK_DIR, f"{data[1]}.csv"))    
          elif export_format == 'excel':
            data = report()
            data[0].to_excel(os.path.join(CURRENT_WORK_DIR, f"{data[1]}.xlsx"))
          else:
            raise Exception('Formato de arquivo inválido')
          
          print(f'{data[1]}.{export_format} exportado')

    def calculate_distance_km(
            self, point: Point, googleGeocoder: GoogleV3, cep: str, distance_km: int) -> bool:
        location = googleGeocoder.geocode(cep, components={"country": "BR"})
        if not location:
          return False

        return geodesic(point, location.point).km <= distance_km

## **Main**

In [89]:
# Extração e Upload
extract_helper = ExtractEstabelecimentosReceita()
extract_helper.extract_and_upload_files()

# Exportação de relatórios
EstabelecimentoCollection().export_reports(export_format='excel')


Empresas ativas 36.6 por cento.excel exportado
Restaurantes abertos por ano.excel exportado
Empresas em um raio de 5km do CEP 01422000.excel exportado
