# Modos de como usar o Leitor de Arquivos


## Como usar o downloader com apenas um link:
Vamos usar como exemplo os dados do [COVID do RS](https://ti.saude.rs.gov.br/covid19/). 

In [1]:
import os
import utils

# declarando variáveis:
path = os.getcwd() # identifica o diretório em que está sendo executado
destino = path + '/arquivos/'
download_link = 'https://ti.saude.rs.gov.br/covid19/download'
arquivos_covid = destino

# download dos arquivos:
utils.downloader(download_link,destino,lista=False)

NameError: name 'utils' is not defined

## Como usar o downloader com um arquivo de links
Vamos usar como exemplo os dados de estabelecimentos dos [dados abertos de empresas](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj). Para isso, vamos usar o arquivo `estabelecimentoss_rf_urls.txt` que contem uma lista de URLs para os arquivos de estabelecimentos.

In [2]:
import os
import utils

# fazendo o download dos arquivos:
utils.downloader(download_link='/estabelecimentos_rf_urls.txt',destino='/arquivos/', lista=True)

/Users/frederico/leitor-arquivos/estabelecimentos_rf_urls.txt


In [2]:
import os
import utils

# descompactando os arquivos:
utils.descompactando(caminho_do_arquivo='/arquivos/', tipo_arquivo='zip', destino=os.getcwd()+'/arquivos/unzipped/')

'arquivos descompactados com sucesso'

## Como usar o leitor dos dados em spark

Para fazer o processamento de dados, pode-se usar a linguagem [SQL para Spark](https://spark.apache.org/docs/latest/sql-ref-syntax.html). As consultas devem ser salvas com o formato `nome.sql` na pasta de consultas.  
Vamos executar a `consulta_exemplo_1.sql` na base de dados do covid (schema já definido) e de estabelecimentos da RF (schema precisa ser definido). 

In [2]:
import os
import leitor

path = os.getcwd() # identifica o diretório em que está sendo executado

# processando os arquivos
leitor.processamento_sql_spark(
    pasta_de_consultas=path+'/consultas/', 
    destino=path+'/arquivos_processados/', 
    diretorio=path+'/arquivos/unzipped/',
    separador=';',
    sql_alias='estabelecimentos')

serao lidos 10 arquivos
consulta_exemplo_1.sql
consulta_exemplo_3.sql


AnalysisException: cannot resolve '`cnpj_basico`' given input columns: [estabelecimentos.0001, estabelecimentos.01, estabelecimentos.07, estabelecimentos.08, estabelecimentos.1, estabelecimentos.1096100,1093702, estabelecimentos.20190513, estabelecimentos.20190722, estabelecimentos.288, estabelecimentos.33, estabelecimentos.33600823, estabelecimentos.39790000, estabelecimentos.4011, estabelecimentos.5620104, estabelecimentos.88333829, estabelecimentos.CENTRO, estabelecimentos.EMILIO VASCONCELOS, estabelecimentos.MG, estabelecimentos.RUA, estabelecimentos.SALGADOS DA ERIDIANA, estabelecimentos._c16, estabelecimentos._c23, estabelecimentos._c24, estabelecimentos._c25, estabelecimentos._c26, estabelecimentos._c28, estabelecimentos._c29, estabelecimentos._c8, estabelecimentos._c9, estabelecimentos.adelano.deptofiscal@gmail.com]; line 13 pos 9;
'Sort [unresolvedordinal(2) DESC NULLS LAST], true
+- 'Aggregate ['cnpj_basico], ['cnpj_basico, count(1) AS estabelecimentos#220L]
   +- SubqueryAlias estabelecimentos
      +- Relation[33600823#100,0001#101,07#102,1#103,SALGADOS DA ERIDIANA#104,08#105,20190722#106,01#107,_c8#108,_c9#109,20190513#110,5620104#111,1096100,1093702#112,RUA#113,EMILIO VASCONCELOS#114,288#115,_c16#116,CENTRO#117,39790000#118,MG#119,4011#120,33#121,88333829#122,_c23#123,... 6 more fields] csv


Executando uma consulta em uma fonte de dados na qual é necessário determinar o schema:

In [2]:
import os
import leitor
from pyspark.sql.types import StringType, StructField, StructType # para determinar o schema

# declarando o schema
schema_estabelecimentos = StructType([
        StructField('cnpj_basico',StringType(),True),
        StructField('cnpj_ordem',StringType(),True),
        StructField('cnpj_dv',StringType(),True),
        StructField('identificador_matriz_filial',StringType(),True),
        StructField('nome_fantasia',StringType(),True),
        StructField('situacao_cadastral',StringType(),True),
        StructField('data_situacao_cadastral',StringType(),True),
        StructField('motivo_situacao_cadastral',StringType(),True),
        StructField('nome_da_cidade_no_exterior',StringType(),True),
        StructField('pais',StringType(),True),
        StructField('data_inicio_atividade',StringType(),True),
        StructField('cnae_fiscal_principal',StringType(),True),
        StructField('cnae_fiscal_secundaria',StringType(),True),
        StructField('tipo_logradouro',StringType(),True),
        StructField('logradouro',StringType(),True),
        StructField('numero',StringType(),True),
        StructField('complemento',StringType(),True),
        StructField('bairro',StringType(),True),
        StructField('cep',StringType(),True),
        StructField('uf',StringType(),True),
        StructField('municipio',StringType(),True),
        StructField('ddd_1',StringType(),True),
        StructField('telefone_1',StringType(),True),
        StructField('ddd_2',StringType(),True),
        StructField('telefone_2',StringType(),True),
        StructField('ddd_fax',StringType(),True),
        StructField('fax',StringType(),True),
        StructField('correio_eletronico',StringType(),True),
        StructField('situacao_especial',StringType(),True),
        StructField('data_situacao_especial',StringType(),True)
        ])

path = os.getcwd() # identifica o diretório em que está sendo executado

# processando os arquivos
leitor.processamento_sql_spark(
    pasta_de_consultas=path+'/consultas/', 
    destino=path+'/arquivos_processados/', 
    diretorio=path+'/arquivos/unzipped/',
    separador=';',
    schema=schema_estabelecimentos,
    sql_alias='estabelecimentos')


serao lidos 10 arquivos
consulta_exemplo_1.sql


'foram processados 1'