
# Diretórios


## Listando diretórios e arquivos

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formatos de lista, logo, não são tão amigáveis de visualizar

dbutils.fs.ls('/FileStore/tables/')

Out[1]: [FileInfo(path='dbfs:/FileStore/tables/alugueis/', name='alugueis/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/bebidas/', name='bebidas/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/gov/', name='gov/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/nutricao/', name='nutricao/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/usuarios/', name='usuarios/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/usuarios-part/', name='usuarios-part/', size=0, modificationTime=0)]

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de lista, porém, são mais amigáveis de visualizar
# Nesse método utilizamos um loop for para percorrer os diretórios

for item in dbutils.fs.ls('/FileStore/tables/'):
    print(item.path)

dbfs:/FileStore/tables/alugueis/
dbfs:/FileStore/tables/bebidas/
dbfs:/FileStore/tables/gov/
dbfs:/FileStore/tables/nutricao/
dbfs:/FileStore/tables/usuarios/
dbfs:/FileStore/tables/usuarios-part/


In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/alugueis/,alugueis/,0,0
dbfs:/FileStore/tables/bebidas/,bebidas/,0,0
dbfs:/FileStore/tables/gov/,gov/,0,0
dbfs:/FileStore/tables/nutricao/,nutricao/,0,0
dbfs:/FileStore/tables/usuarios/,usuarios/,0,0
dbfs:/FileStore/tables/usuarios-part/,usuarios-part/,0,0



## Criação de diretórios

In [0]:
%python

# Criação do diretório (gov) dentro do diretório (tables)

dbutils.fs.mkdirs('/FileStore/tables/gov/')

Out[60]: True

In [0]:
%python

# Criação do diretório (pnsb) dentro do diretório (gov)

dbutils.fs.mkdirs('/FileStore/tables/gov/pnsb')

Out[61]: True

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/csv/,csv/,0,0
dbfs:/FileStore/tables/gov/pnsb/gzip/,gzip/,0,0
dbfs:/FileStore/tables/gov/pnsb/json/,json/,0,0



## Excluindo arquivos

In [0]:
%python

# Remover (excluir) um arquivo do diretório
# Esse métido remove (excluir) um arquivo de um diretório, onde neste caso, nós apontamos o caminho completo e junto o nome do arquiv

dbutils.fs.rm('/FileStore/tables/gov/pnsb/json/pnsb_data.json')

Out[63]: False

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/,pnsb/,0,0



## Consultando arquivo

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/csv/,csv/,0,0
dbfs:/FileStore/tables/gov/pnsb/gzip/,gzip/,0,0
dbfs:/FileStore/tables/gov/pnsb/json/,json/,0,0


In [0]:
%python

# Consultar dados de um arquivo no dbfs
# Como estamos lidando com um módulo para interagir com diretórios e arquivos, a visualização ficará dessa maneira nesse caso
# Quando for utilizado um PySpark ou um SQL, será possível visualizar os dados de forma mais agradável

dbutils.fs.head('/FileStore/tables/gov/pnsb/json/data_pnsb.json')



[Truncated to first 65536 bytes]
Out[7]: '[{"NC":"Nível Territorial (Código)","NN":"Nível Territorial","MC":"Unidade de Medida (Código)","MN":"Unidade de Medida","V":"Valor","D1C":"Brasil, Grande Região e UF (Código)","D1N":"Brasil, Grande Região e UF","D2C":"Variável (Código)","D2N":"Variável","D3C":"Ano (Código)","D3N":"Ano","D4C":"Tipo de doença (Código)","D4N":"Tipo de doença"},{"NC":"1","NN":"Brasil","MC":"1020","MN":"Unidades","V":"2245","D1C":"1","D1N":"Brasil","D2C":"2597","D2N":"Municípios com ocorrência de doenças associadas ao saneamento básico","D3C":"2008","D3N":"2008","D4C":"120931","D4N":"Total"},{"NC":"1","NN":"Brasil","MC":"1020","MN":"Unidades","V":"1547","D1C":"1","D1N":"Brasil","D2C":"2597","D2N":"Municípios com ocorrência de doenças associadas ao saneamento básico","D3C":"2008","D3N":"2008","D4C":"120937","D4N":"Dengue"},{"NC":"1","NN":"Brasil","MC":"1020","MN":"Unidades","V":"5564","D1C":"1","D1N":"Brasil","D2C":"2597","D2N":"Municípios com ocorrência de doenças a


## Movendo arquivo

In [0]:
# Realizar a uma validação e posteriormente mover os arquivos se a condição for satisfatória
# Essa validação considera se o arquivo tiver tamanho igual a (0), e então e move o arquivo de diretório

for item in dbutils.fs.ls('/FileStore/tables/gov/pnsb/gzip'):
    if item.size!=0:
        dbutils.fs.mv(
            f'/FileStore/tables/gov/pnsb/gzip/{item.name}',
            '/FileStore/tables/bebidas/vinhos/json'
        )


## Removendo arquivos

In [0]:
%python

# Realizar uma validação e posteriormente remover os arquivos se a condição for satisfatória
# Essa validação considera se o arquivo tiver tamanho igual a (0), e então exclui o arquivo do diretório

for arquivo in dbutils.fs.ls('/FileStore/tables/gov/pnsb/compression/'):
    if arquivo.size == 0:
        dbutils.fs.rm(f'/FileStore/tables/gov/pnsb/compression/{arquivo.name}')
    else:
        print('Não existe arquivo de tamanho (0) para excluir')

In [0]:
%python

# Realizar a exclusão diretamente do arquivo

dbutils.fs.rm('/FileStore/tables/bebidas/vinhos/data_json')   


# Arquivos


## Json


### Lendo arquivo json

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/json/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/json/data_pnsb.json,data_pnsb.json,129046,1718062768000


In [0]:
%python

# Leitura de um arquivo no formato json com spark em um dataframe
# Nesse caso, as colunas ainda estão com nomes de chave e valor, como por exemplo (D1C = Brasil, Grande Região e UF (Código)).
# No script abaixo deste iremos renomear as colunas, para que fiquem somente com nomes válidos para entendimento.

df = spark.read.json('/FileStore/tables/gov/pnsb/json/data_pnsb.json')
display(df)


D1C,D1N,D2C,D2N,D3C,D3N,D4C,D4N,MC,MN,NC,NN,V
"Brasil, Grande Região e UF (Código)","Brasil, Grande Região e UF",Variável (Código),Variável,Ano (Código),Ano,Tipo de doença (Código),Tipo de doença,Unidade de Medida (Código),Unidade de Medida,Nível Territorial (Código),Nível Territorial,Valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26



### Alterando nome de colunas

In [0]:

%python

# Renomeação de colunas do dataframe com spark
# Nesse caso ficaremos ainda com duas linhas de cabeçalho e não queremos esse cenário
# No script abaixo deste iremos retirar essa segunda linha, e ficaremos somente com a primeira

df = df.withColumnRenamed('D1C', 'cod_regiao')\
       .withColumnRenamed('D1N', 'regiao')\
       .withColumnRenamed('D2C', 'cod_variavel')\
       .withColumnRenamed('D2N', 'variavel')\
       .withColumnRenamed('D3C', 'cod_ano')\
       .withColumnRenamed('D3N', 'ano')\
       .withColumnRenamed('D4C', 'cod_doenca')\
       .withColumnRenamed('D4N', 'doenca')\
       .withColumnRenamed('MC', 'cod_medida')\
       .withColumnRenamed('MN', 'medida')\
       .withColumnRenamed('NC', 'cod_nivel_territorial')\
       .withColumnRenamed('NN', 'nivel_territorial')\
       .withColumnRenamed('V', 'valor')

display(df)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
"Brasil, Grande Região e UF (Código)","Brasil, Grande Região e UF",Variável (Código),Variável,Ano (Código),Ano,Tipo de doença (Código),Tipo de doença,Unidade de Medida (Código),Unidade de Medida,Nível Territorial (Código),Nível Territorial,Valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26


In [0]:
%python

# Visualização dos dados do dataframe com spark
# Retirada da segunda linha, onde iremos ficar somente com a primeira como cabeçalho
# Nesse caso utilizamos um filtro, onde utilizamos a coluna (valor) para retirar linhas que contém o registro (Valor), que neste caso era somente a segunda linha

df = df.filter(df.valor!='Valor')
display(df)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159



### Visualizar schema dataframe

In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df.printSchema()

root
 |-- cod_regiao: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cod_variavel: string (nullable = true)
 |-- variavel: string (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- cod_doenca: string (nullable = true)
 |-- doenca: string (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- medida: string (nullable = true)
 |-- cod_nivel_territorial: string (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- valor: string (nullable = true)




### Alterando tipo das colunas

In [0]:
%python

# Alterar o time dos atributos
# Nesse caso iremos alterar o tipo dos atributos no dataframe com spark, pois, todos estão como (string), e precisamos padronizar com os tipos (types) corretos

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

df_adeq_atrib = df.withColumn('cod_regiao', col('cod_regiao').cast(IntegerType()))\
                  .withColumn('cod_variavel', col('cod_variavel').cast(IntegerType()))\
                  .withColumn('cod_ano', col('cod_ano').cast(IntegerType()))\
                  .withColumn('ano', col('ano').cast(IntegerType()))\
                  .withColumn('cod_doenca', col('cod_doenca').cast(IntegerType()))\
                  .withColumn('cod_medida', col('cod_medida').cast(IntegerType()))\
                  .withColumn('cod_nivel_territorial', col('cod_nivel_territorial').cast(IntegerType()))\
                  .withColumn('valor', col('valor').cast(IntegerType()))          


In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros
# Notar que agora os atributos tiveram seus tipos alterados e adequados

df_adeq_atrib.printSchema()

root
 |-- cod_regiao: integer (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- variavel: string (nullable = true)
 |-- cod_ano: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- cod_medida: integer (nullable = true)
 |-- medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- valor: integer (nullable = true)



In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/json'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/json/data_pnsb.json,data_pnsb.json,129046,1718062768000


In [0]:
%python

# Criação do diretório (pnsb) dentro do diretório (compression)

dbutils.fs.mkdirs('/FileStore/tables/gov/pnsb/gzip')

Out[76]: True

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/csv/,csv/,0,0
dbfs:/FileStore/tables/gov/pnsb/gzip/,gzip/,0,0
dbfs:/FileStore/tables/gov/pnsb/json/,json/,0,0



### Compressão com gzip

In [0]:
%python

# Comprimir um arquivo para que reduzir seu tamanho
# Peso no tamanho normal: 129.046
# Peso no tamanho reduzido: 4.206

df_adeq_atrib.write\
    .option('compression', 'gzip')\
    .mode('overwrite')\
    .format('json')\
    .save('/FileStore/tables/gov/pnsb/gzip/json/')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/gzip/json'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/gzip/json/_SUCCESS,_SUCCESS,0,1718093876000
dbfs:/FileStore/tables/gov/pnsb/gzip/json/_committed_5838141934748889136,_committed_5838141934748889136,117,1718093875000
dbfs:/FileStore/tables/gov/pnsb/gzip/json/_started_5838141934748889136,_started_5838141934748889136,0,1718093874000
dbfs:/FileStore/tables/gov/pnsb/gzip/json/part-00000-tid-5838141934748889136-9092c9b7-2c4f-401d-a443-3c2a261560fc-214-1-c000.json.gz,part-00000-tid-5838141934748889136-9092c9b7-2c4f-401d-a443-3c2a261560fc-214-1-c000.json.gz,4206,1718093875000



### Lendo arquivo json comprimido

In [0]:
%python

# Visualização dos dados do dataframe com spark
# Essa visualização é feita diretamente em um arquivo que foi compactado no formato (gzip)

df_read_compression_gzip_json = spark.read \
    .option('compression', 'gzip') \
    .json('/FileStore/tables/gov/pnsb/gzip/json')

display(df_read_compression_gzip_json)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159.0,Municípios com ocorrência de doenças associadas ao saneamento básico



### Conversão para arquivo csv

In [0]:
%python

# Criar um diretório e salvar um arquivo em formato csv

df_adeq_atrib.write \
    .option('sep', ',') \
    .format('csv') \
    .save('/FileStore/tables/gov/pnsb/csv/')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/csv'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/csv/_SUCCESS,_SUCCESS,0,1718094077000
dbfs:/FileStore/tables/gov/pnsb/csv/_committed_5136325928336761277,_committed_5136325928336761277,113,1718094003000
dbfs:/FileStore/tables/gov/pnsb/csv/_committed_7489078903031081970,_committed_7489078903031081970,212,1718094076000
dbfs:/FileStore/tables/gov/pnsb/csv/_started_5136325928336761277,_started_5136325928336761277,0,1718094002000
dbfs:/FileStore/tables/gov/pnsb/csv/_started_7489078903031081970,_started_7489078903031081970,0,1718094076000
dbfs:/FileStore/tables/gov/pnsb/csv/part-00000-tid-7489078903031081970-ac5e6ec6-093b-48bf-83f7-73c58e39e9cc-244-1-c000.csv,part-00000-tid-7489078903031081970-ac5e6ec6-093b-48bf-83f7-73c58e39e9cc-244-1-c000.csv,79236,1718094076000



## Csv


### Lendo arquivo csv

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/csv/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/csv/_SUCCESS,_SUCCESS,0,1718094077000
dbfs:/FileStore/tables/gov/pnsb/csv/_committed_5136325928336761277,_committed_5136325928336761277,113,1718094003000
dbfs:/FileStore/tables/gov/pnsb/csv/_committed_7489078903031081970,_committed_7489078903031081970,212,1718094076000
dbfs:/FileStore/tables/gov/pnsb/csv/_started_5136325928336761277,_started_5136325928336761277,0,1718094002000
dbfs:/FileStore/tables/gov/pnsb/csv/_started_7489078903031081970,_started_7489078903031081970,0,1718094076000
dbfs:/FileStore/tables/gov/pnsb/csv/part-00000-tid-7489078903031081970-ac5e6ec6-093b-48bf-83f7-73c58e39e9cc-244-1-c000.csv,part-00000-tid-7489078903031081970-ac5e6ec6-093b-48bf-83f7-73c58e39e9cc-244-1-c000.csv,79236,1718094076000


In [0]:
%python

# Criar um diretório e salvar um arquivo em formato csv
# Notar que o diretório já poderia existir, mas neste caso não existia, logo, foi criado
# Dessa vez estamos reescrevendo o arquivo (conversão de json para csv), onde adicionamos opções e modos para que fique correto

df_adeq_atrib.write \
    .option('sep', ',') \
    .option('header', True) \
    .mode('overwrite') \
    .format('csv') \
    .save('/FileStore/tables/gov/pnsb/csv/')

In [0]:
%python

# Leitura de um arquivo no formato json com spark em um dataframe
# Nesse caso, as colunas estão com nomes, como por exemplo (_c0, _c1, _c2 ...)
# Iremos reescrever o arquivo para que ele possua um cabeçalho dessa vez, pois, o arquivo que derivou o csv tinha cabeçalho correto

df_csv = spark.read.csv('/FileStore/tables/gov/pnsb/csv/')
display(df_csv)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26


In [0]:
%python

# Leitura de um arquivo no formato json com spark em um dataframe
# Neste caso adicionamos uma opção de (header=True) na leitura, para que retorne o cabeçalho na visualização

df_csv = spark.read.csv('/FileStore/tables/gov/pnsb/csv/', header=True)
display(df_csv)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159.0


In [0]:
%python

# Leitura de um arquivo no formato json com spark em um dataframe
# Neste caso adicionamos uma opção de (header=True) na leitura, para que retorne o cabeçalho na visualização
# Neste caso adicionamos uma opção de (inferSchema=True), para que o Spark realize uma aferição nas colunas e insira os tipos (types) corretos

df_csv = spark.read.csv('/FileStore/tables/gov/pnsb/csv/', header=True, inferSchema=True)
display(df_csv)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159.0



### Visualizar schema dataframe

In [0]:
# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_csv.printSchema()

root
 |-- cod_regiao: integer (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- variavel: string (nullable = true)
 |-- cod_ano: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- cod_medida: integer (nullable = true)
 |-- medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- valor: integer (nullable = true)




### Compressão com gzip

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/csv'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/csv/_SUCCESS,_SUCCESS,0,1718094077000
dbfs:/FileStore/tables/gov/pnsb/csv/_committed_5136325928336761277,_committed_5136325928336761277,113,1718094003000
dbfs:/FileStore/tables/gov/pnsb/csv/_committed_7489078903031081970,_committed_7489078903031081970,212,1718094076000
dbfs:/FileStore/tables/gov/pnsb/csv/_started_5136325928336761277,_started_5136325928336761277,0,1718094002000
dbfs:/FileStore/tables/gov/pnsb/csv/_started_7489078903031081970,_started_7489078903031081970,0,1718094076000
dbfs:/FileStore/tables/gov/pnsb/csv/part-00000-tid-7489078903031081970-ac5e6ec6-093b-48bf-83f7-73c58e39e9cc-244-1-c000.csv,part-00000-tid-7489078903031081970-ac5e6ec6-093b-48bf-83f7-73c58e39e9cc-244-1-c000.csv,79236,1718094076000


In [0]:
%python

# Comprimir um arquivo para que reduzir seu tamanho
# Peso no tamanho normal: 79.236
# Peso no tamanho reduzido: 3.299

df_csv.write\
    .option('compression', 'gzip')\
    .option('header', 'true') \
    .option('sep', ',') \
    .format('csv') \
    .save('/FileStore/tables/gov/pnsb/gzip/csv/')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/gzip/csv'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_SUCCESS,_SUCCESS,0,1718095349000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_committed_2176882831312312073,_committed_2176882831312312073,207,1718095348000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_committed_5285813752004813820,_committed_5285813752004813820,116,1718094496000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_committed_6714853862286365123,_committed_6714853862286365123,218,1718095115000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_committed_8959853201039068203,_committed_8959853201039068203,207,1718095235000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_started_2176882831312312073,_started_2176882831312312073,0,1718095348000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_started_5285813752004813820,_started_5285813752004813820,0,1718094496000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_started_6714853862286365123,_started_6714853862286365123,0,1718095115000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/_started_8959853201039068203,_started_8959853201039068203,0,1718095234000
dbfs:/FileStore/tables/gov/pnsb/gzip/csv/part-00000-tid-2176882831312312073-931be1ab-ffd7-4d68-8801-8cb9fde1734e-361-1-c000.csv.gz,part-00000-tid-2176882831312312073-931be1ab-ffd7-4d68-8801-8cb9fde1734e-361-1-c000.csv.gz,3299,1718095348000



### Lendo arquivo csv comprimido

In [0]:
%python

# Visualização dos dados do dataframe com spark
# Essa visualização é feita diretamente em um arquivo que foi compactado no formato (gzip)

df_read_compression_gzip_csv = spark.read \
    .option('compression', 'gzip') \
    .csv('/FileStore/tables/gov/pnsb/gzip/csv', header=True, inferSchema=True)

display(df_read_compression_gzip_csv)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159.0


In [0]:
# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_read_compression_gzip_csv.printSchema()

root
 |-- cod_regiao: integer (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- variavel: string (nullable = true)
 |-- cod_ano: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- cod_medida: integer (nullable = true)
 |-- medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- valor: integer (nullable = true)



In [0]:
%python

# Comprimir um arquivo para que reduzir seu tamanho
# Nesse caso inserimos uma opção para inferir o schema do dataframe e já salvar com os tipos dos dados corretos, sem a necessidade de inserir o comando na leitura
# Peso no tamanho normal: 79.236
# Peso no tamanho reduzido: 3.299

df_csv.write\
    .option('compression', 'gzip') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .option('sep', ',') \
    .mode('overwrite') \
    .format('csv') \
    .save('/FileStore/tables/gov/pnsb/gzip/csv/')

In [0]:
%python

# Visualização dos dados do dataframe com spark
# Essa visualização é feita diretamente em um arquivo que foi compactado no formato (gzip)

df_read_compression_gzip_csv_new = spark.read \
    .option('compression', 'gzip') \
    .csv('/FileStore/tables/gov/pnsb/gzip/csv', header=True, inferSchema=True)

display(df_read_compression_gzip_csv_new)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159.0


In [0]:
# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_read_compression_gzip_csv_new.printSchema()

root
 |-- cod_regiao: integer (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- variavel: string (nullable = true)
 |-- cod_ano: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- cod_medida: integer (nullable = true)
 |-- medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- valor: integer (nullable = true)




## Txt


### Lendo um arquivo txt

In [0]:
%python

# Visualização dos dados do dataframe com spark
# Essa visualização é feita diretamente em um arquivo que foi compactado no formato (gzip)

df_read_compression_gzip_csv_new = spark.read \
    .option('compression', 'gzip') \
    .csv('/FileStore/tables/gov/pnsb/gzip/csv', header=True, inferSchema=True)

display(df_read_compression_gzip_csv_new)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26.0
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159.0



### Converter valor nulo em 0

In [0]:
# Preencher os dados da coluna (valor) que estivrem como (null) com o valor (0)
# Essa ação é para que seja possível salvar o dataframe no formato txt
# Notar que estamos utilizando o dataframe do código anterior

df_txt = df_read_compression_gzip_csv_new.na.fill(value=0, subset=['valor'])
display(df_txt)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel_territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159



### Concatenar colunas

In [0]:
%python

# Concatenação de colunas do dataframe em uma única coluna
# Essa ação é necessária para que seja possível, posteriormente, salvar um dataframe como um arquivo do tipo txt

from pyspark.sql.functions import concat_ws

df_txt_unique = df_txt.select(concat_ws('|', *df_txt.columns).alias('dados'))
display(df_txt_unique)

dados
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120931|Total|1020|Unidades|1|Brasil|2245
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120937|Dengue|1020|Unidades|1|Brasil|1547
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120930|Total geral de municípios|1020|Unidades|1|Brasil|5564
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120932|Diarréia|1020|Unidades|1|Brasil|1517
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120933|Leptospirose|1020|Unidades|1|Brasil|197
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120934|Verminoses|1020|Unidades|1|Brasil|1394
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120935|Cólera|1020|Unidades|1|Brasil|54
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120936|Difteria|1020|Unidades|1|Brasil|65
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120938|Tifo|1020|Unidades|1|Brasil|26
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120939|Malária|1020|Unidades|1|Brasil|159



### Salvando o dataframe em txt

In [0]:
%python

# Criar um diretório e salvar um arquivo em formato txt

df_txt_unique.write \
    .format('text') \
    .mode('overwrite') \
    .save('/FileStore/tables/gov/pnsb/txt/')
    

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/txt/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/txt/_SUCCESS,_SUCCESS,0,1720057165000
dbfs:/FileStore/tables/gov/pnsb/txt/_committed_1686094102032445152,_committed_1686094102032445152,113,1720057165000
dbfs:/FileStore/tables/gov/pnsb/txt/_started_1686094102032445152,_started_1686094102032445152,0,1720057164000
dbfs:/FileStore/tables/gov/pnsb/txt/part-00000-tid-1686094102032445152-5b3ad6e9-9d84-4bfb-be3f-4e52f24df77c-115-1-c000.txt,part-00000-tid-1686094102032445152-5b3ad6e9-9d84-4bfb-be3f-4e52f24df77c-115-1-c000.txt,79189,1720057165000


In [0]:
%python

# Leitura de um arquivo no formato txt com spark em um dataframe
# Nesse caso, ele está lendo tudo como uma única coluna
# No próximo código iremos adicionar funções para que o arquivo seja lido em forma colunar

df_read_txt = spark.read.format('text') \
    .load('/FileStore/tables/gov/pnsb/txt/')
display(df_read_txt)

value
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120931|Total|1020|Unidades|1|Brasil|2245
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120937|Dengue|1020|Unidades|1|Brasil|1547
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120930|Total geral de municípios|1020|Unidades|1|Brasil|5564
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120932|Diarréia|1020|Unidades|1|Brasil|1517
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120933|Leptospirose|1020|Unidades|1|Brasil|197
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120934|Verminoses|1020|Unidades|1|Brasil|1394
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120935|Cólera|1020|Unidades|1|Brasil|54
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120936|Difteria|1020|Unidades|1|Brasil|65
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120938|Tifo|1020|Unidades|1|Brasil|26
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120939|Malária|1020|Unidades|1|Brasil|159


In [0]:
%python

# Leitura de um arquivo no formato txt com spark em um dataframe
# Se usarmos o (format('text')) vai dar erro, pois, o esse formato é usado para ler arquivos como um único valor por linha e não é para arquivos delimitados
# Se quiser ler um arquivo delimitado por '|', devemos usar o (format('csv')) mesmo que o delimitador não seja uma vírgula

df_read_delimiter_txt = spark.read \
    .option('header', 'false') \
    .option('delimiter', '|') \
    .option('inferSchema', 'true') \
    .format('csv') \
    .load('/FileStore/tables/gov/pnsb/txt/')
display(df_read_delimiter_txt)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159



### Alterando nome de colunas

In [0]:

%python

# Renomeação de colunas do dataframe com spark

df_rename_columns_txt = df_read_delimiter_txt.withColumnRenamed('_c0', 'ano')\
       .withColumnRenamed('_c1', 'cod_ano')\
       .withColumnRenamed('_c2', 'cod_doenca')\
       .withColumnRenamed('_c3', 'cod_medida')\
       .withColumnRenamed('_c4', 'cod_nivel_territorial')\
       .withColumnRenamed('_c5', 'cod_regiao')\
       .withColumnRenamed('_c6', 'cod_variavel')\
       .withColumnRenamed('_c7', 'doenca')\
       .withColumnRenamed('_c8', 'medida')\
       .withColumnRenamed('_c9', 'nivel_territorial')\
       .withColumnRenamed('_c10', 'regiao')\
       .withColumnRenamed('_c11', 'valor')\
       .withColumnRenamed('_c12', 'variavel')

display(df_rename_columns_txt)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159



### Visualizar schema dataframe

In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_rename_columns_txt.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- regiao: integer (nullable = true)
 |-- valor: string (nullable = true)
 |-- variavel: integer (nullable = true)




### Compressão com gzip

In [0]:
%python

# Concatenação de colunas do dataframe em uma única coluna
# Essa ação é necessária para que seja possível, posteriormente, salvar um dataframe como um arquivo do tipo txt
# Essa ação também é necessária para que seja possível, posteriormente, salvar esse arquivo com compressão

from pyspark.sql.functions import concat_ws

df_txt_concat_pre_compress = df_rename_columns_txt.select(concat_ws('|', *df_rename_columns_txt.columns).alias('dados'))
display(df_txt_concat_pre_compress)

dados
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120931|Total|1020|Unidades|1|Brasil|2245
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120937|Dengue|1020|Unidades|1|Brasil|1547
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120930|Total geral de municípios|1020|Unidades|1|Brasil|5564
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120932|Diarréia|1020|Unidades|1|Brasil|1517
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120933|Leptospirose|1020|Unidades|1|Brasil|197
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120934|Verminoses|1020|Unidades|1|Brasil|1394
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120935|Cólera|1020|Unidades|1|Brasil|54
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120936|Difteria|1020|Unidades|1|Brasil|65
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120938|Tifo|1020|Unidades|1|Brasil|26
1|Brasil|2597|Municípios com ocorrência de doenças associadas ao saneamento básico|2008|2008|120939|Malária|1020|Unidades|1|Brasil|159


In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/txt'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/txt/_SUCCESS,_SUCCESS,0,1720057165000
dbfs:/FileStore/tables/gov/pnsb/txt/_committed_1686094102032445152,_committed_1686094102032445152,113,1720057165000
dbfs:/FileStore/tables/gov/pnsb/txt/_started_1686094102032445152,_started_1686094102032445152,0,1720057164000
dbfs:/FileStore/tables/gov/pnsb/txt/part-00000-tid-1686094102032445152-5b3ad6e9-9d84-4bfb-be3f-4e52f24df77c-115-1-c000.txt,part-00000-tid-1686094102032445152-5b3ad6e9-9d84-4bfb-be3f-4e52f24df77c-115-1-c000.txt,79189,1720057165000


In [0]:
%python

# Comprimir um arquivo para que reduzir seu tamanho
# Peso no tamanho normal: 79.189
# Peso no tamanho reduzido: 3.231

df_txt_concat_pre_compress.write \
    .mode('overwrite') \
    .option('compression', 'gzip') \
    .format('text') \
    .save('/FileStore/tables/gov/pnsb/gzip/txt/')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/gzip/txt'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/gzip/txt/_SUCCESS,_SUCCESS,0,1720059432000
dbfs:/FileStore/tables/gov/pnsb/gzip/txt/_committed_400775006670675980,_committed_400775006670675980,115,1720059432000
dbfs:/FileStore/tables/gov/pnsb/gzip/txt/_started_400775006670675980,_started_400775006670675980,0,1720059431000
dbfs:/FileStore/tables/gov/pnsb/gzip/txt/part-00000-tid-400775006670675980-379f5ca0-2a9e-4bc4-925f-39fde2cd97d3-149-1-c000.txt.gz,part-00000-tid-400775006670675980-379f5ca0-2a9e-4bc4-925f-39fde2cd97d3-149-1-c000.txt.gz,3231,1720059432000



### Lendo o arquivo comprimido

In [0]:
%python

# Visualização dos dados do dataframe com spark
# Essa visualização é feita diretamente em um arquivo que foi compactado no formato (gzip)

df_read_compression_gzip_txt = spark.read \
    .option('compression', 'gzip') \
    .option('inferSchema', 'True') \
    .option('sep', '|') \
    .csv('/FileStore/tables/gov/pnsb/gzip/txt')

display(df_read_compression_gzip_txt)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159


In [0]:

%python

# Renomeação de colunas do dataframe com spark

df_rename_columns_gzip_txt = df_read_compression_gzip_txt.withColumnRenamed('_c0', 'ano')\
       .withColumnRenamed('_c1', 'cod_ano')\
       .withColumnRenamed('_c2', 'cod_doenca')\
       .withColumnRenamed('_c3', 'cod_medida')\
       .withColumnRenamed('_c4', 'cod_nivel_territorial')\
       .withColumnRenamed('_c5', 'cod_regiao')\
       .withColumnRenamed('_c6', 'cod_variavel')\
       .withColumnRenamed('_c7', 'doenca')\
       .withColumnRenamed('_c8', 'medida')\
       .withColumnRenamed('_c9', 'nivel_territorial')\
       .withColumnRenamed('_c10', 'regiao')\
       .withColumnRenamed('_c11', 'valor')\
       .withColumnRenamed('_c12', 'variavel')

display(df_rename_columns_gzip_txt)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159


In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_rename_columns_gzip_txt.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- regiao: integer (nullable = true)
 |-- valor: string (nullable = true)
 |-- variavel: integer (nullable = true)




### Conversão para arquivo avro

In [0]:
%python

# Criar um diretório e salvar um arquivo em formato avro

df_rename_columns_gzip_txt.write \
    .mode('overwrite') \
    .format('avro') \
    .save('/FileStore/tables/gov/pnsb/avro/')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/avro/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/avro/_SUCCESS,_SUCCESS,0,1720060501000
dbfs:/FileStore/tables/gov/pnsb/avro/_committed_2916638587103345126,_committed_2916638587103345126,114,1720060501000
dbfs:/FileStore/tables/gov/pnsb/avro/_started_2916638587103345126,_started_2916638587103345126,0,1720060500000
dbfs:/FileStore/tables/gov/pnsb/avro/part-00000-tid-2916638587103345126-26f6f48c-cc82-4f61-a5ff-a5aa68a02fb6-175-1-c000.avro,part-00000-tid-2916638587103345126-26f6f48c-cc82-4f61-a5ff-a5aa68a02fb6-175-1-c000.avro,7712,1720060500000



## Avro


### Lendo arquivo avro

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/avro/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/avro/_SUCCESS,_SUCCESS,0,1720060501000
dbfs:/FileStore/tables/gov/pnsb/avro/_committed_2916638587103345126,_committed_2916638587103345126,114,1720060501000
dbfs:/FileStore/tables/gov/pnsb/avro/_started_2916638587103345126,_started_2916638587103345126,0,1720060500000
dbfs:/FileStore/tables/gov/pnsb/avro/part-00000-tid-2916638587103345126-26f6f48c-cc82-4f61-a5ff-a5aa68a02fb6-175-1-c000.avro,part-00000-tid-2916638587103345126-26f6f48c-cc82-4f61-a5ff-a5aa68a02fb6-175-1-c000.avro,7712,1720060500000


In [0]:
%python

# Leitura de um arquivo no formato avro com spark em um dataframe

df_avro = spark.read \
    .format('avro') \
    .load('/FileStore/tables/gov/pnsb/avro/')
display(df_avro)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159


In [0]:
%python

# Leitura de um arquivo no formato avro com spark em um dataframe
# Nesse caso estamos utilizando a função (pathGlobFilter='*avro'), para que só seja lido os arquivos de extensão (avro)
# Caso não colocassemos essa função, todos os arquivos disponíveis no diretório lidos, inclusive o (avro)

df_avro = spark.read \
    .format('avro') \
    .load('/FileStore/tables/gov/pnsb/avro/', pathGlobFilter='*avro')
display(df_avro)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159



### Visualizar schema dataframe

In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_avro.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- regiao: integer (nullable = true)
 |-- valor: string (nullable = true)
 |-- variavel: integer (nullable = true)




### Compressão com deflate

In [0]:
%python

# Comprimir um arquivo para que reduzir seu tamanho
# Peso no tamanho normal: 7.712
# Peso no tamanho reduzido: 4.538

df_avro.write\
    .mode('overwrite') \
    .option('compression', 'deflate') \
    .format('avro') \
    .save('/FileStore/tables/gov/pnsb/deflate/avro')

In [0]:
%python

# Caso queira deixar a compressão padrão como (deflate) para arquivos (avro), é possível realizar essa ação alterando uma configuração conforme abaixo
# Depois você pode tentar execução novamente a compressão sem a opção ('compression', 'deflate')

spark.conf.set('spark.sql.avro.compression.codec', 'deflate')

In [0]:
%python

# Também é possível especifica o nível de compressão, que por padrão é 6
# Nesse caso deixaremos com o nível de compressão 8
# Quando maior o número, mais comprimido será o arquivo, porém, a compressão será mais lenta

spark.conf.set('spark.sql.avro.deflate.level', '8')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/deflate/avro/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/deflate/avro/_SUCCESS,_SUCCESS,0,1720144740000
dbfs:/FileStore/tables/gov/pnsb/deflate/avro/_committed_1390349052651505759,_committed_1390349052651505759,112,1720144739000
dbfs:/FileStore/tables/gov/pnsb/deflate/avro/_started_1390349052651505759,_started_1390349052651505759,0,1720144738000
dbfs:/FileStore/tables/gov/pnsb/deflate/avro/part-00000-tid-1390349052651505759-1055bc41-415b-40c3-94e9-455d0b2fd8e4-3-1-c000.avro,part-00000-tid-1390349052651505759-1055bc41-415b-40c3-94e9-455d0b2fd8e4-3-1-c000.avro,4538,1720144739000



### Conversão para arquivo parquet

In [0]:
%python

# Criar um diretório e salvar um arquivo em formato parquet

df_avro.write \
    .mode('overwrite') \
    .format('parquet') \
    .save('/FileStore/tables/gov/pnsb/parquet/')


## Parquet


### Lendo um arquivo parquet

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/parquet/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/parquet/_SUCCESS,_SUCCESS,0,1720236798000
dbfs:/FileStore/tables/gov/pnsb/parquet/_committed_359798026235999736,_committed_359798026235999736,122,1720236797000
dbfs:/FileStore/tables/gov/pnsb/parquet/_started_359798026235999736,_started_359798026235999736,0,1720236794000
dbfs:/FileStore/tables/gov/pnsb/parquet/part-00000-tid-359798026235999736-587cf5ab-45d5-4472-a6ab-8662214bb04d-10-1-c000.snappy.parquet,part-00000-tid-359798026235999736-587cf5ab-45d5-4472-a6ab-8662214bb04d-10-1-c000.snappy.parquet,6701,1720236797000


In [0]:
%python

# Leitura de um arquivo no formato parquet com spark em um dataframe

df_parquet = spark.read \
    .format('parquet') \
    .load('/FileStore/tables/gov/pnsb/parquet/')
display(df_parquet)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159



### Compressão com gzip

In [0]:
%python

# Criar um diretório e salvar um arquivo em formato parquet
# Nesse caso estamos comprimindo o arquivo (parquet) com (gzip)
# Por padrão, o arquivo (parquet) é comprimido com o (snappy), e nesse caso, vamos comprimir com (gzip)

df_avro.write \
    .mode('overwrite') \
    .option('compression', 'gzip') \
    .format('parquet') \
    .save('/FileStore/tables/gov/pnsb/gzip/parquet')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/gzip/parquet/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/gzip/parquet/_SUCCESS,_SUCCESS,0,1720237471000
dbfs:/FileStore/tables/gov/pnsb/gzip/parquet/_committed_45462836731053539,_committed_45462836731053539,117,1720237470000
dbfs:/FileStore/tables/gov/pnsb/gzip/parquet/_started_45462836731053539,_started_45462836731053539,0,1720237469000
dbfs:/FileStore/tables/gov/pnsb/gzip/parquet/part-00000-tid-45462836731053539-3a9a7a57-fff1-4d68-b907-9683950659fd-19-1-c000.gz.parquet,part-00000-tid-45462836731053539-3a9a7a57-fff1-4d68-b907-9683950659fd-19-1-c000.gz.parquet,6480,1720237470000



### Lendo o arquivo comprimido

In [0]:
%python

# Leitura de um arquivo no formato avro com spark em um dataframe

df_parquet_compress_gzip = spark.read \
    .format('parquet') \
    .load('/FileStore/tables/gov/pnsb/gzip/parquet/', compression = 'gzip')
display(df_parquet_compress_gzip)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159



### Visualizar schema dataframe

In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_parquet_compress_gzip.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- regiao: integer (nullable = true)
 |-- valor: string (nullable = true)
 |-- variavel: integer (nullable = true)




### Particionamento uma chave

In [0]:
%python

# Visualizar os dados distintos de uma coluna especifica

df_parquet.select('cod_doenca').distinct().show()

+----------+
|cod_doenca|
+----------+
|      2597|
+----------+



In [0]:
%python

# Visualizar os dados distintos de uma coluna especifica

df_parquet.select('nivel_territorial').distinct().show()

+-----------------+
|nivel_territorial|
+-----------------+
|         Unidades|
+-----------------+



In [0]:
%python

# Visualizar os dados distintos de uma coluna especifica

df_parquet.select('valor').distinct().show()

+--------------------+
|               valor|
+--------------------+
|       Grande Região|
|              Brasil|
|Unidade da Federação|
+--------------------+



In [0]:
%python

# Particionar um arquivo por uma coluna especifica

df_parquet.write \
    .partitionBy('valor') \
    .mode('overwrite') \
    .parquet('/FileStore/tables/gov/pnsb/partition/unique-key')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/partition/unique-key'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/_SUCCESS,_SUCCESS,0,1720239654000
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/valor=Brasil/,valor=Brasil/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/valor=Grande Região/,valor=Grande Região/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/valor=Unidade da Federação/,valor=Unidade da Federação/,0,0


In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/partition/unique-key/valor=Brasil/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/valor=Brasil/_SUCCESS,_SUCCESS,0,1720239653000
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/valor=Brasil/_committed_8150812524621928204,_committed_8150812524621928204,123,1720239653000
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/valor=Brasil/_started_8150812524621928204,_started_8150812524621928204,0,1720239653000
dbfs:/FileStore/tables/gov/pnsb/partition/unique-key/valor=Brasil/part-00000-tid-8150812524621928204-7df31425-2259-4f25-b452-2c7a013c0103-65-1.c000.snappy.parquet,part-00000-tid-8150812524621928204-7df31425-2259-4f25-b452-2c7a013c0103-65-1.c000.snappy.parquet,4434,1720239653000



### Particionamento duas chaves

In [0]:
%python

# Visualizar os dados distintos de uma coluna especifica

df_parquet.select('cod_variavel').distinct().show()

+------------+
|cod_variavel|
+------------+
|      120944|
|      120941|
|      120943|
|      120939|
|      120937|
|      120933|
|      120942|
|      120930|
|      120940|
|      120935|
|      120936|
|      120934|
|      120938|
|      120931|
|      120932|
+------------+



In [0]:
%python

# Particionar um arquivo por duas colunas especificas

df_parquet.write \
    .partitionBy('valor', 'cod_variavel') \
    .mode('overwrite') \
    .parquet('/FileStore/tables/gov/pnsb/partition/double-key')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/partition/double-key'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/_SUCCESS,_SUCCESS,0,1720239866000
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/,valor=Brasil/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Grande Região/,valor=Grande Região/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Unidade da Federação/,valor=Unidade da Federação/,0,0


In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120930/,cod_variavel=120930/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120931/,cod_variavel=120931/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120932/,cod_variavel=120932/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120933/,cod_variavel=120933/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120934/,cod_variavel=120934/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120935/,cod_variavel=120935/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120936/,cod_variavel=120936/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120937/,cod_variavel=120937/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120938/,cod_variavel=120938/,0,0
dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120939/,cod_variavel=120939/,0,0



### Leitura de uma partição

In [0]:
%python

# Leitura de uma única partição de um arquivo particionado por duas chaves

df_read_partition = spark.read \
    .parquet('dbfs:/FileStore/tables/gov/pnsb/partition/double-key/valor=Brasil/cod_variavel=120930/')

display(df_read_partition)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,doenca,medida,nivel_territorial,regiao,variavel
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Total geral de municípios,1020,Unidades,1,5564



### Leitura de todas partições

In [0]:
%python

# Leitura de toda a base que é particionada por duas chaves
df_read_general = spark.read \
    .parquet('dbfs:/FileStore/tables/gov/pnsb/partition/double-key/')

display(df_read_general)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,doenca,medida,nivel_territorial,regiao,variavel,valor,cod_variavel
11,Rondônia,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,5,Unidade da Federação,120943
12,Acre,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,11,Unidade da Federação,120943
13,Amazonas,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,13,Unidade da Federação,120943
14,Roraima,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,0,Unidade da Federação,120943
15,Pará,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,38,Unidade da Federação,120943
16,Amapá,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,12,Unidade da Federação,120943
17,Tocantins,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,8,Unidade da Federação,120943
21,Maranhão,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,45,Unidade da Federação,120943
22,Piauí,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,25,Unidade da Federação,120943
23,Ceará,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Doença do aparelho respiratório,1020,Unidades,3,35,Unidade da Federação,120943



### Visualizar schema dataframe

In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_read_general.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- regiao: integer (nullable = true)
 |-- variavel: integer (nullable = true)
 |-- valor: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)




### Conversão para arquivo orc

In [0]:
%python

df_read_general.write \
    .format('orc') \
    .save('/FileStore/tables/gov/pnsb/orc/archives-partition')


## ORC


### Lendo um arquivo orc

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/orc/archives-partition/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/_SUCCESS,_SUCCESS,0,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/_committed_6957632609259106974,_committed_6957632609259106974,784,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/_started_6957632609259106974,_started_6957632609259106974,0,1720278486000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/part-00000-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-89-1-c000.snappy.orc,part-00000-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-89-1-c000.snappy.orc,2728,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/part-00001-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-90-1-c000.snappy.orc,part-00001-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-90-1-c000.snappy.orc,2719,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/part-00002-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-91-1-c000.snappy.orc,part-00002-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-91-1-c000.snappy.orc,2729,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/part-00003-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-92-1-c000.snappy.orc,part-00003-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-92-1-c000.snappy.orc,2191,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/part-00004-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-93-1-c000.snappy.orc,part-00004-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-93-1-c000.snappy.orc,2152,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/part-00005-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-94-1-c000.snappy.orc,part-00005-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-94-1-c000.snappy.orc,1939,1720278490000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-partition/part-00006-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-95-1-c000.snappy.orc,part-00006-tid-6957632609259106974-5708408c-de61-4501-b935-75c052735a3b-95-1-c000.snappy.orc,1880,1720278490000



### Compressão com zlib

In [0]:
%python

# Criar um diretório e salvar um arquivo em formato orc
# Nesse caso estamos comprimindo o arquivo (orc) com (zlib)

df_read_general.write \
    .format('orc') \
    .mode('overwrite') \
    .option('compression', 'zlib') \
    .save('/FileStore/tables/gov/pnsb/zlib/orc/')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/zlib/orc/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/_SUCCESS,_SUCCESS,0,1720277499000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/_committed_71471291640348370,_committed_71471291640348370,752,1720277499000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/_started_71471291640348370,_started_71471291640348370,0,1720277494000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/part-00000-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-57-1-c000.zlib.orc,part-00000-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-57-1-c000.zlib.orc,2355,1720277498000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/part-00001-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-58-1-c000.zlib.orc,part-00001-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-58-1-c000.zlib.orc,2328,1720277499000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/part-00002-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-59-1-c000.zlib.orc,part-00002-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-59-1-c000.zlib.orc,2361,1720277498000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/part-00003-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-60-1-c000.zlib.orc,part-00003-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-60-1-c000.zlib.orc,1903,1720277499000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/part-00004-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-61-1-c000.zlib.orc,part-00004-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-61-1-c000.zlib.orc,1873,1720277499000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/part-00005-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-62-1-c000.zlib.orc,part-00005-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-62-1-c000.zlib.orc,1701,1720277499000
dbfs:/FileStore/tables/gov/pnsb/zlib/orc/part-00006-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-63-1-c000.zlib.orc,part-00006-tid-71471291640348370-c43ff8f1-6c03-45e2-95b1-b3af51a9fc5b-63-1-c000.zlib.orc,1650,1720277499000



### Lendo o arquivo comprimido partitions

In [0]:
%python

# Leitura de um arquivo no formato orc com spark em um dataframe

df_read_orc = spark.read \
    .option('compression', 'zlib') \
    .format('orc') \
    .load('/FileStore/tables/gov/pnsb/zlib/orc/') \

display(df_read_orc)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,doenca,medida,nivel_territorial,regiao,variavel,valor,cod_variavel
11,Rondônia,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
12,Acre,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
13,Amazonas,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,1,Unidade da Federação,120936
14,Roraima,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,1,Unidade da Federação,120936
15,Pará,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,10,Unidade da Federação,120936
16,Amapá,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
17,Tocantins,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
21,Maranhão,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,3,Unidade da Federação,120936
22,Piauí,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,2,Unidade da Federação,120936
23,Ceará,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,2,Unidade da Federação,120936



### Visualizar schema dataframe

In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_read_orc.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- regiao: integer (nullable = true)
 |-- variavel: integer (nullable = true)
 |-- valor: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)




### Agrupando as partições criadas

In [0]:
df_read_orc.coalesce(1) \
    .write \
    .format('orc') \
    .mode('overwrite') \
    .save('/FileStore/tables/gov/pnsb/orc/archives-unique')

In [0]:
%python

# Listar diretórios e arquivos
# Esse método lista os nomes em formato de tabela, logo, são amigáveis de visualizar

display(dbutils.fs.ls('/FileStore/tables/gov/pnsb/orc/archives-unique/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/gov/pnsb/orc/archives-unique/_SUCCESS,_SUCCESS,0,1720278571000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-unique/_committed_906126020620518083,_committed_906126020620518083,119,1720278570000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-unique/_started_906126020620518083,_started_906126020620518083,0,1720278567000
dbfs:/FileStore/tables/gov/pnsb/orc/archives-unique/part-00000-tid-906126020620518083-1f92ce26-e1af-4806-a606-4cdb2dd03360-105-1-c000.snappy.orc,part-00000-tid-906126020620518083-1f92ce26-e1af-4806-a606-4cdb2dd03360-105-1-c000.snappy.orc,3663,1720278570000



### Lendo o arquivo comprimido partition

In [0]:
%python

# Leitura de um arquivo no formato orc com spark em um dataframe

df_read_unique_partition = spark.read \
    .option('compression', 'snappy') \
    .format('orc') \
    .load('/FileStore/tables/gov/pnsb/orc/archives-unique/') \

display(df_read_unique_partition)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,doenca,medida,nivel_territorial,regiao,variavel,valor,cod_variavel
11,Rondônia,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
12,Acre,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
13,Amazonas,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,1,Unidade da Federação,120936
14,Roraima,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,1,Unidade da Federação,120936
15,Pará,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,10,Unidade da Federação,120936
16,Amapá,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
17,Tocantins,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,0,Unidade da Federação,120936
21,Maranhão,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,3,Unidade da Federação,120936
22,Piauí,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,2,Unidade da Federação,120936
23,Ceará,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,Difteria,1020,Unidades,3,2,Unidade da Federação,120936



### Visualizar schema dataframe

In [0]:
%python

# Visualizar o schema do DataFrame
# Nesse caso é possível visualizar qual o tipo do atributo, se é (string, int, float, double, binary), entre outros

df_read_unique_partition.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: integer (nullable = true)
 |-- nivel_territorial: string (nullable = true)
 |-- regiao: integer (nullable = true)
 |-- variavel: integer (nullable = true)
 |-- valor: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)




# Fim