## Comandos Databricks Utilities - dbutils

**Explorando conteúdo das pastas e arquivos**

In [0]:
dbutils.fs.ls('/FileStore/')

Out[4]: [FileInfo(path='dbfs:/FileStore/tables/', name='tables/', size=0, modificationTime=0)]

In [0]:
display(dbutils.fs.ls('/FileStore/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/,tables/,0,0


In [0]:
display(dbutils.fs.ls('/FileStore/tables/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/arquivos_curso/,arquivos_curso/,0,0
dbfs:/FileStore/tables/aula-databricks/,aula-databricks/,0,0
dbfs:/FileStore/tables/dados/,dados/,0,0
dbfs:/FileStore/tables/data.csv,data.csv,2781,1705440822000
dbfs:/FileStore/tables/explorando_tipos_arquivos/,explorando_tipos_arquivos/,0,0


**Criando pasta**

In [0]:
dbutils.fs.mkdirs('/FileStore/tables/explorando_tipos_arquivos')

Out[4]: True

**Consultando arquivo disponível após importação**

In [0]:
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/PNSB.json,PNSB.json,129046,1706120595000
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/,json_gzip/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/,pnsb_csv/,0,0


## JSON

**Lendo um arquivo JSON**

In [0]:
df = spark.read.json('/FileStore/tables/explorando_tipos_arquivos/PNSB.json')
display(df)

D1C,D1N,D2C,D2N,D3C,D3N,D4C,D4N,MC,MN,NC,NN,V
"Brasil, Grande Região e UF (Código)","Brasil, Grande Região e UF",Variável (Código),Variável,Ano (Código),Ano,Tipo de doença (Código),Tipo de doença,Unidade de Medida (Código),Unidade de Medida,Nível Territorial (Código),Nível Territorial,Valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26


**Renomeando as colunas**

In [0]:
df.columns

Out[9]: ['D1C',
 'D1N',
 'D2C',
 'D2N',
 'D3C',
 'D3N',
 'D4C',
 'D4N',
 'MC',
 'MN',
 'NC',
 'NN',
 'V']

In [0]:
for col in df.columns:
    print(col)

D1C
D1N
D2C
D2N
D3C
D3N
D4C
D4N
MC
MN
NC
NN
V


In [0]:
df = df.withColumnRenamed("D1C","cod_regiao") \
            .withColumnRenamed("D1N", "regiao")\
            .withColumnRenamed("D2C","cod_variavel") \
            .withColumnRenamed("D2N", "variavel") \
            .withColumnRenamed("D3C", "cod_ano") \
            .withColumnRenamed("D3N", "ano") \
            .withColumnRenamed("D4C","cod_doenca") \
            .withColumnRenamed("D4N", "doenca") \
            .withColumnRenamed("MC","cod_medida") \
            .withColumnRenamed("MN", "medida") \
            .withColumnRenamed("NC","cod_nivel_territorial") \
            .withColumnRenamed("NN", "nivel territorial") \
            .withColumnRenamed("V","valor")

display (df)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel territorial,valor
"Brasil, Grande Região e UF (Código)","Brasil, Grande Região e UF",Variável (Código),Variável,Ano (Código),Ano,Tipo de doença (Código),Tipo de doença,Unidade de Medida (Código),Unidade de Medida,Nível Territorial (Código),Nível Territorial,Valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26


**Removendo uma linha do dataframe**

In [0]:
df = df.filter(df.valor != 'Valor')
display(df)

cod_regiao,regiao,cod_variavel,variavel,cod_ano,ano,cod_doenca,doenca,cod_medida,medida,cod_nivel_territorial,nivel territorial,valor
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120931,Total,1020,Unidades,1,Brasil,2245
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120937,Dengue,1020,Unidades,1,Brasil,1547
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120930,Total geral de municípios,1020,Unidades,1,Brasil,5564
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120932,Diarréia,1020,Unidades,1,Brasil,1517
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120933,Leptospirose,1020,Unidades,1,Brasil,197
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120934,Verminoses,1020,Unidades,1,Brasil,1394
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120935,Cólera,1020,Unidades,1,Brasil,54
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120936,Difteria,1020,Unidades,1,Brasil,65
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120938,Tifo,1020,Unidades,1,Brasil,26
1,Brasil,2597,Municípios com ocorrência de doenças associadas ao saneamento básico,2008,2008,120939,Malária,1020,Unidades,1,Brasil,159


**Analisando tipagem das colunas**

In [0]:
df.printSchema()

root
 |-- cod_regiao: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cod_variavel: string (nullable = true)
 |-- variavel: string (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- cod_doenca: string (nullable = true)
 |-- doenca: string (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- medida: string (nullable = true)
 |-- cod_nivel_territorial: string (nullable = true)
 |-- nivel territorial: string (nullable = true)
 |-- valor: string (nullable = true)



**Modificando o tipo das colunas com (cast)**

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# com o método .withColumn, cria uma nova coluna referenciando a anterior e realizando a substituição usando o método .cast().
df_new = df.withColumn("cod_regiao", col("cod_regiao").cast(IntegerType())) \
                    .withColumn("cod_variavel", col("cod_variavel").cast(IntegerType())) \
                    .withColumn("cod_ano", col("cod_ano").cast(IntegerType())) \
                    .withColumn("ano", col("ano").cast(IntegerType())) \
                    .withColumn("cod_doenca", col("cod_doenca").cast(IntegerType())) \
                    .withColumn("cod_medida", col("cod_medida").cast (IntegerType())) \
                    .withColumn("cod_nivel_territorial", col("cod_nivel_territorial").cast(IntegerType())) \
                    .withColumn("valor", col("valor").cast(IntegerType()))

In [0]:
df_new.printSchema()

root
 |-- cod_regiao: integer (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- variavel: string (nullable = true)
 |-- cod_ano: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- cod_medida: integer (nullable = true)
 |-- medida: string (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- nivel territorial: string (nullable = true)
 |-- valor: integer (nullable = true)



## Escrevendo um arquivo JSON

In [0]:
# verificando conteúdo da pasta
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/PNSB.json,PNSB.json,129046,1706120595000
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/,json_gzip/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/,pnsb_csv/,0,0


**Reduzindo o tamanho do arquivo**

In [0]:
df_new.write \
    .option("compression", "gzip") \
    .mode("overwrite") \
    .format("json") \
    .save("/FileStore/tables/explorando_tipos_arquivos/json_gzip")

In [0]:
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/json_gzip"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/_SUCCESS,_SUCCESS,0,1706146488000
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/_committed_128260965334058891,_committed_128260965334058891,115,1706124580000
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/_committed_4060245833664381573,_committed_4060245833664381573,217,1706146488000
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/_committed_vacuum1962597930251922711,_committed_vacuum1962597930251922711,95,1706146489000
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/_started_4060245833664381573,_started_4060245833664381573,0,1706146486000
dbfs:/FileStore/tables/explorando_tipos_arquivos/json_gzip/part-00000-tid-4060245833664381573-a6a00075-52ae-4b69-9c74-4f935aab96f6-46-1-c000.json.gz,part-00000-tid-4060245833664381573-a6a00075-52ae-4b69-9c74-4f935aab96f6-46-1-c000.json.gz,4210,1706146487000


**Realizando a leitura do arquivo comprimido**

In [0]:
df = spark.read \
        .option("compression", "gzip") \
        .json("/FileStore/tables/explorando_tipos_arquivos/json_gzip/")

display(df)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159.0,Municípios com ocorrência de doenças associadas ao saneamento básico


**Salvando dataframe em outro formato de arquivo**

In [0]:
df.write \
    .option("sep", ",") \
    .format("csv") \
    .save("/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/")

In [0]:
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/pnsb_csv"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_SUCCESS,_SUCCESS,0,1706129550000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_committed_2089073311441822416,_committed_2089073311441822416,112,1706128197000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_committed_6959400174097143177,_committed_6959400174097143177,211,1706129550000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_started_2089073311441822416,_started_2089073311441822416,0,1706128197000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_started_6959400174097143177,_started_6959400174097143177,0,1706129549000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/part-00000-tid-6959400174097143177-62948d4b-46e0-4476-bd8b-4e01c51e894f-116-1-c000.csv,part-00000-tid-6959400174097143177-62948d4b-46e0-4476-bd8b-4e01c51e894f-116-1-c000.csv,79236,1706129549000


## Criando uma função para identificar qual a melhorar forma de compressão

In [0]:
def salvando_arquivo_comprimido_json(dataframe):

  compressoes = ["none", "bzip2", "gzip", "lz4", "snappy", "deflate"]
  caminho_padrao = "/FileStore/tables/arquivos_curso/desafio/json_"

  for tipo_compressao in compressoes:
    dataframe.write\
      .option("compression", tipo_compressao)\
      .mode("overwrite") \
      .format("json") \
      .save(caminho_padrao + tipo_compressao)

    display(dbutils.fs.ls(caminho_padrao + tipo_compressao))


In [0]:
salvando_arquivo_comprimido_json(df)

path,name,size,modificationTime
dbfs:/FileStore/tables/arquivos_curso/desafio/json_none/_SUCCESS,_SUCCESS,0,1706128594000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_none/_committed_8681026451553464023,_committed_8681026451553464023,113,1706128594000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_none/_started_8681026451553464023,_started_8681026451553464023,0,1706128593000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_none/part-00000-tid-8681026451553464023-16743ec0-92bc-4db7-ab54-0f2ab7d124df-60-1-c000.json,part-00000-tid-8681026451553464023-16743ec0-92bc-4db7-ab54-0f2ab7d124df-60-1-c000.json,163459,1706128593000


path,name,size,modificationTime
dbfs:/FileStore/tables/arquivos_curso/desafio/json_bzip2/_SUCCESS,_SUCCESS,0,1706128597000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_bzip2/_committed_2134224044780498886,_committed_2134224044780498886,117,1706128596000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_bzip2/_started_2134224044780498886,_started_2134224044780498886,0,1706128596000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_bzip2/part-00000-tid-2134224044780498886-8c3e74af-61fa-49bf-9d13-1e4efa0a839a-69-1-c000.json.bz2,part-00000-tid-2134224044780498886-8c3e74af-61fa-49bf-9d13-1e4efa0a839a-69-1-c000.json.bz2,3200,1706128596000


path,name,size,modificationTime
dbfs:/FileStore/tables/arquivos_curso/desafio/json_gzip/_SUCCESS,_SUCCESS,0,1706128599000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_gzip/_committed_3576929321507724082,_committed_3576929321507724082,116,1706128599000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_gzip/_started_3576929321507724082,_started_3576929321507724082,0,1706128599000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_gzip/part-00000-tid-3576929321507724082-c1fc765c-b2cc-40d0-b697-e9f74dc96588-78-1-c000.json.gz,part-00000-tid-3576929321507724082-c1fc765c-b2cc-40d0-b697-e9f74dc96588-78-1-c000.json.gz,6256,1706128599000


path,name,size,modificationTime
dbfs:/FileStore/tables/arquivos_curso/desafio/json_lz4/_SUCCESS,_SUCCESS,0,1706128602000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_lz4/_committed_812901193702527325,_committed_812901193702527325,116,1706128601000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_lz4/_started_812901193702527325,_started_812901193702527325,0,1706128601000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_lz4/part-00000-tid-812901193702527325-06ab03e4-4e46-4440-ba7c-ffa69348936b-87-1-c000.json.lz4,part-00000-tid-812901193702527325-06ab03e4-4e46-4440-ba7c-ffa69348936b-87-1-c000.json.lz4,10785,1706128601000


path,name,size,modificationTime
dbfs:/FileStore/tables/arquivos_curso/desafio/json_snappy/_SUCCESS,_SUCCESS,0,1706128604000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_snappy/_committed_3565171916867156758,_committed_3565171916867156758,120,1706128604000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_snappy/_started_3565171916867156758,_started_3565171916867156758,0,1706128603000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_snappy/part-00000-tid-3565171916867156758-6652db48-df3f-42b9-b2e0-26084b52096e-96-1-c000.json.snappy,part-00000-tid-3565171916867156758-6652db48-df3f-42b9-b2e0-26084b52096e-96-1-c000.json.snappy,13689,1706128604000


path,name,size,modificationTime
dbfs:/FileStore/tables/arquivos_curso/desafio/json_deflate/_SUCCESS,_SUCCESS,0,1706128606000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_deflate/_committed_5497172185466326763,_committed_5497172185466326763,122,1706128606000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_deflate/_started_5497172185466326763,_started_5497172185466326763,0,1706128606000
dbfs:/FileStore/tables/arquivos_curso/desafio/json_deflate/part-00000-tid-5497172185466326763-80d31510-0cbf-4fc3-aeb1-32748502a365-105-1-c000.json.deflate,part-00000-tid-5497172185466326763-80d31510-0cbf-4fc3-aeb1-32748502a365-105-1-c000.json.deflate,6244,1706128606000


## Manipulando arquivo CSV

**Leitura de arquivo CSV**

In [0]:
df_csv = spark.read.csv('/FileStore/tables/explorando_tipos_arquivos/pnsb_csv')
display(df_csv)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico


**Regravando o arquivo CSV, agora especificando a presença de cabeçalho e optando por sobrescrever o arquivo existente**

In [0]:
df.write \
    .option("sep", ",") \
    .option('header', True) \
    .mode('overwrite') \
    .format("csv") \
    .save("/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/")

In [0]:
# Realizando nova leitura no arquivo CSV
df_csv = spark.read.csv('/FileStore/tables/explorando_tipos_arquivos/pnsb_csv')
display(df_csv)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico


**Especificando a primeira linha como cabecalho**

In [0]:
df_csv = spark.read.csv('/FileStore/tables/explorando_tipos_arquivos/pnsb_csv', header=True)
display(df_csv)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159.0,Municípios com ocorrência de doenças associadas ao saneamento básico


In [0]:
df_csv.printSchema()

root
 |-- ano: string (nullable = true)
 |-- cod_ano: string (nullable = true)
 |-- cod_doenca: string (nullable = true)
 |-- cod_medida: string (nullable = true)
 |-- cod_nivel_territorial: string (nullable = true)
 |-- cod_regiao: string (nullable = true)
 |-- cod_variavel: string (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: string (nullable = true)
 |-- nivel territorial: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- valor: string (nullable = true)
 |-- variavel: string (nullable = true)



In [0]:
# Pedindo para o spark fazer a inferência do schema
df_csv = spark.read.csv('/FileStore/tables/explorando_tipos_arquivos/pnsb_csv', header=True, inferSchema=True)
display(df_csv)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159.0,Municípios com ocorrência de doenças associadas ao saneamento básico


In [0]:
df_csv.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- cod_ano: integer (nullable = true)
 |-- cod_doenca: integer (nullable = true)
 |-- cod_medida: integer (nullable = true)
 |-- cod_nivel_territorial: integer (nullable = true)
 |-- cod_regiao: integer (nullable = true)
 |-- cod_variavel: integer (nullable = true)
 |-- doenca: string (nullable = true)
 |-- medida: string (nullable = true)
 |-- nivel territorial: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- valor: integer (nullable = true)
 |-- variavel: string (nullable = true)



## Escrevendo arquivo CSV com compressão

In [0]:
df_csv.write \
            .option("compression", "gzip") \
            .option("header", "true") \
            .option("sep", ",") \
            .format("csv") \
            .save("/FileStore/tables/explorando_tipos_arquivos/csv_gzip/")

In [0]:
# validando e visualizando gravação do arquivo compactado
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/csv_gzip"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/csv_gzip/_SUCCESS,_SUCCESS,0,1706147044000
dbfs:/FileStore/tables/explorando_tipos_arquivos/csv_gzip/_committed_9213003086028788765,_committed_9213003086028788765,115,1706147044000
dbfs:/FileStore/tables/explorando_tipos_arquivos/csv_gzip/_started_9213003086028788765,_started_9213003086028788765,0,1706147043000
dbfs:/FileStore/tables/explorando_tipos_arquivos/csv_gzip/part-00000-tid-9213003086028788765-974823ae-caa9-4905-ae46-e1d7ab243945-74-1-c000.csv.gz,part-00000-tid-9213003086028788765-974823ae-caa9-4905-ae46-e1d7ab243945-74-1-c000.csv.gz,5002,1706147044000


In [0]:
# Comparando tamanho do arquivo original abaixo com o arquivo compactado acima
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/pnsb_csv"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_committed_2089073311441822416,_committed_2089073311441822416,112,1706128197000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_committed_2701933651272897168,_committed_2701933651272897168,200,1706146586000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_committed_6959400174097143177,_committed_6959400174097143177,211,1706129550000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_committed_8828116495340709521,_committed_8828116495340709521,199,1706146701000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_committed_vacuum1057305234746800320,_committed_vacuum1057305234746800320,96,1706146587000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_started_2701933651272897168,_started_2701933651272897168,0,1706146586000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/_started_8828116495340709521,_started_8828116495340709521,0,1706146701000
dbfs:/FileStore/tables/explorando_tipos_arquivos/pnsb_csv/part-00000-tid-8828116495340709521-72f09c7c-140a-4f32-9fc3-049b0e4f81bd-68-1-c000.csv,part-00000-tid-8828116495340709521-72f09c7c-140a-4f32-9fc3-049b0e4f81bd-68-1-c000.csv,79236,1706146701000


**Leitura do arquivo compactado**

In [0]:
df = spark.read \
            .option("compression", "gzip") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("sep", ",") \
            .csv("/FileStore/tables/explorando_tipos_arquivos/csv_gzip")

display(df)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26.0,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159.0,Municípios com ocorrência de doenças associadas ao saneamento básico


## Manipulando arquivo TXT

**Escrita do arquivo TXT**

**Obs.: Se fizermos a concatenação das colunas tendo os dados nulos, 
eles serão ignorados e os valores da coluna seguinte assumirão seu lugar**

In [0]:
df = df.na.fill(value=0, subset=["valor"])
display(df)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


**Gravando em um arquivo TXT**

In [0]:
df.write.text("/FileStore/tables/explorando_tipos_arquivos/txt/")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3533478660458427>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mtext[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43m/FileStore/tables/explorando_tipos_arquivos/txt/[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(


**O erro acima ocorre porque a coluna "ano" possui tipo inteiro e que não é suportado no formato texto**

**Corrigindo erro acima utilizando método concat_ws com separador para fazer junção das colunas**

In [0]:
from pyspark.sql.functions import concat_ws

df_uma_coluna = df.select(concat_ws("|", *df.columns).alias('dados'))
display(df_uma_coluna)

dados
2008|2008|120931|1020|1|1|2597|Total|Unidades|Brasil|Brasil|2245|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120937|1020|1|1|2597|Dengue|Unidades|Brasil|Brasil|1547|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120930|1020|1|1|2597|Total geral de municípios|Unidades|Brasil|Brasil|5564|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120932|1020|1|1|2597|Diarréia|Unidades|Brasil|Brasil|1517|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120933|1020|1|1|2597|Leptospirose|Unidades|Brasil|Brasil|197|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120934|1020|1|1|2597|Verminoses|Unidades|Brasil|Brasil|1394|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120935|1020|1|1|2597|Cólera|Unidades|Brasil|Brasil|54|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120936|1020|1|1|2597|Difteria|Unidades|Brasil|Brasil|65|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120938|1020|1|1|2597|Tifo|Unidades|Brasil|Brasil|26|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120939|1020|1|1|2597|Malária|Unidades|Brasil|Brasil|159|Municípios com ocorrência de doenças associadas ao saneamento básico


**Escrevendo o dataframe**

In [0]:
df_uma_coluna.write \
            .format("text") \
            .mode("overwrite") \
            .save("/FileStore/tables/explorando_tipos_arquivos/txt/")

In [0]:
# visualizando o conteúdo da pasta de interesse
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/txt/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_SUCCESS,_SUCCESS,0,1706189121000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_committed_896227987089041921,_committed_896227987089041921,111,1706150190000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_committed_9003021902903591339,_committed_9003021902903591339,208,1706189120000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_committed_vacuum8903812455761501923,_committed_vacuum8903812455761501923,95,1706189122000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_started_9003021902903591339,_started_9003021902903591339,0,1706189119000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/part-00000-tid-9003021902903591339-1fbf34c1-b0d4-4c2f-9646-dfb7e451b305-5-1-c000.txt,part-00000-tid-9003021902903591339-1fbf34c1-b0d4-4c2f-9646-dfb7e451b305-5-1-c000.txt,79189,1706189120000


**Realizando a leitura do arquivo**

In [0]:
df_text = spark.read.format("text") \
            .load("/FileStore/tables/explorando_tipos_arquivos/txt/")

display(df_text)

value
2008|2008|120931|1020|1|1|2597|Total|Unidades|Brasil|Brasil|2245|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120937|1020|1|1|2597|Dengue|Unidades|Brasil|Brasil|1547|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120930|1020|1|1|2597|Total geral de municípios|Unidades|Brasil|Brasil|5564|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120932|1020|1|1|2597|Diarréia|Unidades|Brasil|Brasil|1517|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120933|1020|1|1|2597|Leptospirose|Unidades|Brasil|Brasil|197|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120934|1020|1|1|2597|Verminoses|Unidades|Brasil|Brasil|1394|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120935|1020|1|1|2597|Cólera|Unidades|Brasil|Brasil|54|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120936|1020|1|1|2597|Difteria|Unidades|Brasil|Brasil|65|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120938|1020|1|1|2597|Tifo|Unidades|Brasil|Brasil|26|Municípios com ocorrência de doenças associadas ao saneamento básico
2008|2008|120939|1020|1|1|2597|Malária|Unidades|Brasil|Brasil|159|Municípios com ocorrência de doenças associadas ao saneamento básico


**Separando as colunas**

In [0]:
df_text = spark.read \
        .option("header", "false") \
        .option("delimiter", "|") \
        .option("inferSchema", "true") \
        .format("csv") \
        .load("/FileStore/tables/explorando_tipos_arquivos/txt/")

display(df_text)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


**Renomeando as colunas**

In [0]:
df_text_2 = df_text.withColumnRenamed ("_c0", "ano") \
                .withColumnRenamed("_c1","cod_ano") \
                .withColumnRenamed("_c2","cod_doenca") \
                .withColumnRenamed("_c3","cod_medida") \
                .withColumnRenamed("_c4","cod_nivel_territorial") \
                .withColumnRenamed("_c5","cod_regiao") \
                .withColumnRenamed("_c6", "cod_variavel") \
                .withColumnRenamed("_c7", "doenca") \
                .withColumnRenamed("_c8", "medida") \
                .withColumnRenamed("_c9", "nivel_territorial") \
                .withColumnRenamed("_c10", "regiao") \
                .withColumnRenamed("_c11","valor") \
                .withColumnRenamed("_c12","variavel")

display (df_text_2)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


## Salvando arquivo TXT comprimido

In [0]:
df_text_2.write \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .format("text") \
    .save("/FileStore/tables/explorando_tipos_arquivos/txt_gzip/")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3529756066386970>:1[0m
[0;32m----> 1[0m [43mdf_text_2[49m[38;5;241;43m.[39;49m[43mwrite[49m[43m [49m[43m\[49m
[1;32m      2[0m [43m    [49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      3[0m [43m    [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcompression[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43mgzip[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      4[0m [43m    [49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mtext[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m    

**Resolvendo erro de coluna do tipo inteiro não suportado por texto**
**Utilizando dataframe existente que já possui uma coluna**

In [0]:
df_uma_coluna.write \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .format("text") \
    .save("dbfs:/FileStore/tables/explorando_tipos_arquivos/txt_gzip/")

In [0]:
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/txt_gzip'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt_gzip/_SUCCESS,_SUCCESS,0,1706190382000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt_gzip/_committed_6206846499356284011,_committed_6206846499356284011,115,1706190382000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt_gzip/_started_6206846499356284011,_started_6206846499356284011,0,1706190381000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt_gzip/part-00000-tid-6206846499356284011-b5d38cf9-e8f4-4732-9c19-f244064db78f-19-1-c000.txt.gz,part-00000-tid-6206846499356284011-b5d38cf9-e8f4-4732-9c19-f244064db78f-19-1-c000.txt.gz,4932,1706190382000


In [0]:
# Analisando tamanho do arquivo original e compactado
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/txt'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_SUCCESS,_SUCCESS,0,1706189121000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_committed_896227987089041921,_committed_896227987089041921,111,1706150190000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_committed_9003021902903591339,_committed_9003021902903591339,208,1706189120000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_committed_vacuum8903812455761501923,_committed_vacuum8903812455761501923,95,1706189122000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/_started_9003021902903591339,_started_9003021902903591339,0,1706189119000
dbfs:/FileStore/tables/explorando_tipos_arquivos/txt/part-00000-tid-9003021902903591339-1fbf34c1-b0d4-4c2f-9646-dfb7e451b305-5-1-c000.txt,part-00000-tid-9003021902903591339-1fbf34c1-b0d4-4c2f-9646-dfb7e451b305-5-1-c000.txt,79189,1706189120000


**Leitura arquivo de texto compactado**

In [0]:
df = spark.read \
     .option("compression", "gzip") \
   .option("inferSchema", "true") \
   .option("sep", "|") \
   .csv("/FileStore/tables/explorando_tipos_arquivos/txt_gzip/")

display(df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


**Renomeando as colunas**

In [0]:
df_renomeado = df_text.withColumnRenamed ("_c0", "ano") \
  .withColumnRenamed("_c1","cod_ano") \
  .withColumnRenamed("_c2","cod_doenca") \
  .withColumnRenamed("_c3","cod_medida") \
  .withColumnRenamed("_c4","cod_nivel_territorial") \
  .withColumnRenamed("_c5","cod_regiao") \
  .withColumnRenamed("_c6", "cod_variavel") \
  .withColumnRenamed("_c7", "doenca") \
  .withColumnRenamed("_c8", "medida") \
  .withColumnRenamed("_c9", "nivel_territorial") \
  .withColumnRenamed("_c10", "regiao") \
  .withColumnRenamed("_c11","valor") \
  .withColumnRenamed("_c12","variavel")

display (df_renomeado)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


## Manipulando arquivo no formato AVRO

**Salvando arquivo**

In [0]:
df_renomeado.write \
    .mode("overwrite") \
    .format('avro') \
    .save("/FileStore/tables/explorando_tipos_arquivos/avro/")

In [0]:
# Consultando e validando a gravação do arquivo no formato AVRO
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/avro/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/_SUCCESS,_SUCCESS,0,1706191227000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/_committed_1399109665869392578,_committed_1399109665869392578,113,1706191227000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/_started_1399109665869392578,_started_1399109665869392578,0,1706191226000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/part-00000-tid-1399109665869392578-9eda1851-5647-4892-9e7a-098fdd07e151-40-1-c000.avro,part-00000-tid-1399109665869392578-9eda1851-5647-4892-9e7a-098fdd07e151-40-1-c000.avro,10101,1706191227000


**Leitura de arquivo**

In [0]:
df_avro = spark.read \
    .format("avro") \
    .load("/FileStore/tables/explorando_tipos_arquivos/avro/")

display(df_avro)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


In [0]:
# Carregando somente arquivos com AVRO no diretorio
df_avro = spark.read \
    .format("avro") \
    .load("/FileStore/tables/explorando_tipos_arquivos/avro/", pathGlobFilter="*.avro")

display(df_avro)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


**Escrevendo dataframe com o codec de compressão deflate**

In [0]:
df_avro.write \
    .mode("overwrite") \
    .option("compression", "deflate") \
    .format('avro') \
    .save("/FileStore/tables/explorando_tipos_arquivos/avro_deflate")

In [0]:
# Consultando e validando a compressão com defalte
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/avro_deflate/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate/_SUCCESS,_SUCCESS,0,1706196875000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate/_committed_6263856619397249716,_committed_6263856619397249716,112,1706196874000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate/_started_6263856619397249716,_started_6263856619397249716,0,1706196873000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate/part-00000-tid-6263856619397249716-2e7333ab-9ccd-427b-a27b-d08334d7ddb2-1-1-c000.avro,part-00000-tid-6263856619397249716-2e7333ab-9ccd-427b-a27b-d08334d7ddb2-1-1-c000.avro,6226,1706196874000


In [0]:
# Analisando tamanho do arquivo original e com compressão
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/avro/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/_SUCCESS,_SUCCESS,0,1706191227000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/_committed_1399109665869392578,_committed_1399109665869392578,113,1706191227000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/_started_1399109665869392578,_started_1399109665869392578,0,1706191226000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro/part-00000-tid-1399109665869392578-9eda1851-5647-4892-9e7a-098fdd07e151-40-1-c000.avro,part-00000-tid-1399109665869392578-9eda1851-5647-4892-9e7a-098fdd07e151-40-1-c000.avro,10101,1706191227000


**Alterando codec padrão de compressão para defalte**

In [0]:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")

**Refazendo a escrita do dataframe sem especificar o codec**

In [0]:
df_avro.write \
    .mode("overwrite") \
    .format('avro') \
    .save("/FileStore/tables/explorando_tipos_arquivos/avro_deflate2")

In [0]:
# Analisando tamanho do arquivo com compressão
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_SUCCESS,_SUCCESS,0,1706197337000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_committed_877409621601511854,_committed_877409621601511854,112,1706197336000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_started_877409621601511854,_started_877409621601511854,0,1706197335000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/part-00000-tid-877409621601511854-70590678-3712-40c4-b222-38ce73375194-18-1-c000.avro,part-00000-tid-877409621601511854-70590678-3712-40c4-b222-38ce73375194-18-1-c000.avro,6226,1706197336000


**Especificando o nível de compressão**.
**Por padrão o nível é 6 e esse nível vai de 0 a 9**

In [0]:
spark.conf.set("spark.sql.avro.deflate.level", "8")

In [0]:
# sobrescrevendo arquivo e validando novo tamanho de compressão com nível 8
df_avro.write \
    .mode("overwrite") \
    .format('avro') \
    .save("/FileStore/tables/explorando_tipos_arquivos/avro_deflate2")

In [0]:
# Analisando tamanho do arquivo com compressão
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_SUCCESS,_SUCCESS,0,1706197749000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_committed_877409621601511854,_committed_877409621601511854,112,1706197336000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_committed_8907155767155559421,_committed_8907155767155559421,211,1706197748000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_started_877409621601511854,_started_877409621601511854,0,1706197335000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/_started_8907155767155559421,_started_8907155767155559421,0,1706197747000
dbfs:/FileStore/tables/explorando_tipos_arquivos/avro_deflate2/part-00000-tid-8907155767155559421-5edbdb05-06ee-419f-9b9f-61d3c357054c-27-1-c000.avro,part-00000-tid-8907155767155559421-5edbdb05-06ee-419f-9b9f-61d3c357054c-27-1-c000.avro,6059,1706197748000


## Manipulando arquivos no formato PARQUET

**Lendo e escrevendo arquivos PARQUET**

In [0]:
df_avro.write \
        .mode("overwrite") \
        .format('parquet') \
        .save("/FileStore/tables/explorando_tipos_arquivos/parquet")

In [0]:
# Visualizando conteúdo da pasta
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/parquet/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet/_SUCCESS,_SUCCESS,0,1706198430000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet/_committed_2321667962722984806,_committed_2321667962722984806,123,1706198430000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet/_started_2321667962722984806,_started_2321667962722984806,0,1706198426000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet/part-00000-tid-2321667962722984806-a97af1d4-3589-4bde-9af4-a0900d2970fd-36-1-c000.snappy.parquet,part-00000-tid-2321667962722984806-a97af1d4-3589-4bde-9af4-a0900d2970fd-36-1-c000.snappy.parquet,6698,1706198430000


In [0]:
# Salvando arquivo com compressão gzip
df_avro.write \
        .mode("overwrite") \
        .option("compression", "gzip") \
        .format('parquet') \
        .save("/FileStore/tables/explorando_tipos_arquivos/parquet_gzip")

In [0]:
# Visualizando conteúdo
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/parquet_gzip/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_gzip/_SUCCESS,_SUCCESS,0,1706198685000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_gzip/_committed_7747779520527056558,_committed_7747779520527056558,119,1706198684000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_gzip/_started_7747779520527056558,_started_7747779520527056558,0,1706198684000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_gzip/part-00000-tid-7747779520527056558-c3b807bb-eb35-47a6-8cd4-2bf80b854963-45-1-c000.gz.parquet,part-00000-tid-7747779520527056558-c3b807bb-eb35-47a6-8cd4-2bf80b854963-45-1-c000.gz.parquet,6477,1706198684000


**Realizando leitura do arquivo com compressão**

In [0]:
df_parquet = spark.read.format("parquet") \
    .load("/FileStore/tables/explorando_tipos_arquivos/parquet_gzip", compression='gzip')

display(df_parquet)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,nivel_territorial,regiao,valor,variavel
2008,2008,120931,1020,1,1,2597,Total,Unidades,Brasil,Brasil,2245,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120937,1020,1,1,2597,Dengue,Unidades,Brasil,Brasil,1547,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120930,1020,1,1,2597,Total geral de municípios,Unidades,Brasil,Brasil,5564,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120932,1020,1,1,2597,Diarréia,Unidades,Brasil,Brasil,1517,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120933,1020,1,1,2597,Leptospirose,Unidades,Brasil,Brasil,197,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120934,1020,1,1,2597,Verminoses,Unidades,Brasil,Brasil,1394,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120935,1020,1,1,2597,Cólera,Unidades,Brasil,Brasil,54,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120936,1020,1,1,2597,Difteria,Unidades,Brasil,Brasil,65,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120938,1020,1,1,2597,Tifo,Unidades,Brasil,Brasil,26,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,120939,1020,1,1,2597,Malária,Unidades,Brasil,Brasil,159,Municípios com ocorrência de doenças associadas ao saneamento básico


## Particionamento

In [0]:
# verificando valores nas coluna para definir melhor particionamento
df_parquet.select("cod_doenca").distinct().show()

+----------+
|cod_doenca|
+----------+
|    120944|
|    120941|
|    120943|
|    120939|
|    120937|
|    120933|
|    120942|
|    120930|
|    120940|
|    120935|
|    120936|
|    120934|
|    120938|
|    120931|
|    120932|
+----------+



In [0]:
df_parquet.select("nivel_territorial").distinct().show()

+--------------------+
|   nivel_territorial|
+--------------------+
|       Grande Região|
|              Brasil|
|Unidade da Federação|
+--------------------+



**Definida a escolha de particionamento pela coluna 'nivel_territorial' utilizando o método .partitionBy()**

In [0]:
df_parquet.write \
    .partitionBy("nivel_territorial") \
    .mode("overwrite") \
    .parquet("/FileStore/tables/explorando_tipos_arquivos/parquet_particionado")

In [0]:
# Visualizando conteúdo
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/_SUCCESS,_SUCCESS,0,1706199774000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Brasil/,nivel_territorial=Brasil/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Grande Região/,nivel_territorial=Grande Região/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Unidade da Federação/,nivel_territorial=Unidade da Federação/,0,0


In [0]:
# Visualizando conteúdo de uma pasta com arquivo particionado
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Brasil'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Brasil/_SUCCESS,_SUCCESS,0,1706199774000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Brasil/_committed_6735421314784505055,_committed_6735421314784505055,123,1706199773000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Brasil/_started_6735421314784505055,_started_6735421314784505055,0,1706199772000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_particionado/nivel_territorial=Brasil/part-00000-tid-6735421314784505055-cca1e060-c90b-430f-85a6-bfc2feb44819-60-1.c000.snappy.parquet,part-00000-tid-6735421314784505055-cca1e060-c90b-430f-85a6-bfc2feb44819-60-1.c000.snappy.parquet,4398,1706199773000


**Realizando a partição do arquivo com base em duas colunas para aprimorar o nível de particionamento**

In [0]:
df_parquet.write \
    .partitionBy("nivel_territorial", "cod_doenca") \
    .mode("overwrite") \
    .parquet("/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado")

In [0]:
# visualizando conteúdo
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/_SUCCESS,_SUCCESS,0,1706200597000
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Brasil/,nivel_territorial=Brasil/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/,nivel_territorial=Grande Região/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Unidade da Federação/,nivel_territorial=Unidade da Federação/,0,0


In [0]:
display(dbutils.fs.ls('/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região'))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120930/,cod_doenca=120930/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120931/,cod_doenca=120931/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120932/,cod_doenca=120932/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120933/,cod_doenca=120933/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120934/,cod_doenca=120934/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120935/,cod_doenca=120935/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120936/,cod_doenca=120936/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120937/,cod_doenca=120937/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120938/,cod_doenca=120938/,0,0
dbfs:/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120939/,cod_doenca=120939/,0,0


In [0]:
# Realizando leitura do arquivo de interesse com código 120943
df_120943 = spark.read \
    .parquet('/FileStore/tables/explorando_tipos_arquivos/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120943/')

display(df_120943)

ano,cod_ano,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,regiao,valor,variavel
2008,2008,1020,2,1,2597,Doença do aparelho respiratório,Unidades,Norte,87,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,1020,2,2,2597,Doença do aparelho respiratório,Unidades,Nordeste,356,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,1020,2,3,2597,Doença do aparelho respiratório,Unidades,Sudeste,158,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,1020,2,4,2597,Doença do aparelho respiratório,Unidades,Sul,28,Municípios com ocorrência de doenças associadas ao saneamento básico
2008,2008,1020,2,5,2597,Doença do aparelho respiratório,Unidades,Centro-Oeste,26,Municípios com ocorrência de doenças associadas ao saneamento básico


**Retornando ao dataframe original**

In [0]:
df = spark.read\
    .parquet('/FileStore/tables/explorando_tipos_arquivos/parquet_particionado')

display(df)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,regiao,valor,variavel,nivel_territorial
2008,2008,120931,1020,3,11,2597,Total,Unidades,Rondônia,42,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120937,1020,3,11,2597,Dengue,Unidades,Rondônia,29,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120930,1020,3,11,2597,Total geral de municípios,Unidades,Rondônia,52,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120932,1020,3,11,2597,Diarréia,Unidades,Rondônia,27,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120933,1020,3,11,2597,Leptospirose,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120934,1020,3,11,2597,Verminoses,Unidades,Rondônia,28,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120935,1020,3,11,2597,Cólera,Unidades,Rondônia,2,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120936,1020,3,11,2597,Difteria,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120938,1020,3,11,2597,Tifo,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120939,1020,3,11,2597,Malária,Unidades,Rondônia,8,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação


## Manipulando arquivos no formato ORC

**Escrevendo e lendo arquivos**

In [0]:
df.write \
    .format('orc') \
    .save("/FileStore/tables/explorando_tipos_arquivos/orc")

In [0]:
# consultando arquivo
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/orc"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc/_SUCCESS,_SUCCESS,0,1706225063000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc/_committed_4807400830212968997,_committed_4807400830212968997,306,1706225063000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc/_started_4807400830212968997,_started_4807400830212968997,0,1706225061000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc/part-00000-tid-4807400830212968997-f4ee35cd-40b5-4b39-95fe-44ddfba5b527-4-1-c000.snappy.orc,part-00000-tid-4807400830212968997-f4ee35cd-40b5-4b39-95fe-44ddfba5b527-4-1-c000.snappy.orc,3377,1706225062000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc/part-00001-tid-4807400830212968997-f4ee35cd-40b5-4b39-95fe-44ddfba5b527-5-1-c000.snappy.orc,part-00001-tid-4807400830212968997-f4ee35cd-40b5-4b39-95fe-44ddfba5b527-5-1-c000.snappy.orc,2443,1706225062000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc/part-00002-tid-4807400830212968997-f4ee35cd-40b5-4b39-95fe-44ddfba5b527-6-1-c000.snappy.orc,part-00002-tid-4807400830212968997-f4ee35cd-40b5-4b39-95fe-44ddfba5b527-6-1-c000.snappy.orc,2091,1706225062000


**Fazendo compressão do arquivo ORC**

In [0]:
df.write.format("orc") \
    .mode("overwrite") \
    .option("compression", "zlib") \
    .save("/FileStore/tables/explorando_tipos_arquivos/orc_zlib")

In [0]:
# consultando arquivo
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/orc_zlib"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_zlib/_SUCCESS,_SUCCESS,0,1706225299000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_zlib/_committed_1675034947537603796,_committed_1675034947537603796,303,1706225299000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_zlib/_started_1675034947537603796,_started_1675034947537603796,0,1706225298000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_zlib/part-00000-tid-1675034947537603796-c12ea664-b005-40ba-9853-a20dec257462-15-1-c000.zlib.orc,part-00000-tid-1675034947537603796-c12ea664-b005-40ba-9853-a20dec257462-15-1-c000.zlib.orc,2780,1706225298000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_zlib/part-00001-tid-1675034947537603796-c12ea664-b005-40ba-9853-a20dec257462-16-1-c000.zlib.orc,part-00001-tid-1675034947537603796-c12ea664-b005-40ba-9853-a20dec257462-16-1-c000.zlib.orc,2129,1706225299000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_zlib/part-00002-tid-1675034947537603796-c12ea664-b005-40ba-9853-a20dec257462-17-1-c000.zlib.orc,part-00002-tid-1675034947537603796-c12ea664-b005-40ba-9853-a20dec257462-17-1-c000.zlib.orc,1822,1706225299000


**Leitura do arquivo compactado**

In [0]:
df_orc = spark.read\
    .option("compression", "zlib") \
    .format("orc") \
    .load("/FileStore/tables/explorando_tipos_arquivos/orc_zlib")

display(df_orc)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,regiao,valor,variavel,nivel_territorial
2008,2008,120931,1020,3,11,2597,Total,Unidades,Rondônia,42,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120937,1020,3,11,2597,Dengue,Unidades,Rondônia,29,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120930,1020,3,11,2597,Total geral de municípios,Unidades,Rondônia,52,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120932,1020,3,11,2597,Diarréia,Unidades,Rondônia,27,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120933,1020,3,11,2597,Leptospirose,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120934,1020,3,11,2597,Verminoses,Unidades,Rondônia,28,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120935,1020,3,11,2597,Cólera,Unidades,Rondônia,2,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120936,1020,3,11,2597,Difteria,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120938,1020,3,11,2597,Tifo,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120939,1020,3,11,2597,Malária,Unidades,Rondônia,8,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação


**Agrupamento das partições em um único arquivo**

In [0]:
df_orc.coalesce(1)\
    .write \
    .format("orc") \
    .mode("overwrite") \
    .save("/FileStore/tables/explorando_tipos_arquivos/orc_junto_snappy")

In [0]:
# consultando arquivo
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/orc_junto_snappy/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_snappy/_SUCCESS,_SUCCESS,0,1706225965000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_snappy/_committed_1463640092229380443,_committed_1463640092229380443,119,1706225965000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_snappy/_started_1463640092229380443,_started_1463640092229380443,0,1706225963000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_snappy/part-00000-tid-1463640092229380443-269a2b06-9305-4cef-8b35-bffecd364f53-29-1-c000.snappy.orc,part-00000-tid-1463640092229380443-269a2b06-9305-4cef-8b35-bffecd364f53-29-1-c000.snappy.orc,3677,1706225965000


**Comparando processo de compressão .snappy X .zlib**

In [0]:
df_orc.coalesce(1)\
    .write \
    .format("orc") \
    .mode("overwrite") \
    .option("compression", "zlib") \
    .save("/FileStore/tables/explorando_tipos_arquivos/orc_junto_zlib")

In [0]:
display(dbutils.fs.ls("/FileStore/tables/explorando_tipos_arquivos/orc_junto_zlib"))

path,name,size,modificationTime
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_zlib/_SUCCESS,_SUCCESS,0,1706226190000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_zlib/_committed_6698462425931291000,_committed_6698462425931291000,117,1706226190000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_zlib/_started_6698462425931291000,_started_6698462425931291000,0,1706226188000
dbfs:/FileStore/tables/explorando_tipos_arquivos/orc_junto_zlib/part-00000-tid-6698462425931291000-1599769c-c377-4918-b277-fedc021a3fe2-38-1-c000.zlib.orc,part-00000-tid-6698462425931291000-1599769c-c377-4918-b277-fedc021a3fe2-38-1-c000.zlib.orc,3022,1706226190000


**Leitura do arquivo ORC comprimido**

In [0]:
df_orc_zlib = spark.read \
    .option("compression", "zlib") \
    .orc("/FileStore/tables/explorando_tipos_arquivos/orc_junto_zlib")

display(df_orc_zlib)

ano,cod_ano,cod_doenca,cod_medida,cod_nivel_territorial,cod_regiao,cod_variavel,doenca,medida,regiao,valor,variavel,nivel_territorial
2008,2008,120931,1020,3,11,2597,Total,Unidades,Rondônia,42,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120937,1020,3,11,2597,Dengue,Unidades,Rondônia,29,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120930,1020,3,11,2597,Total geral de municípios,Unidades,Rondônia,52,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120932,1020,3,11,2597,Diarréia,Unidades,Rondônia,27,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120933,1020,3,11,2597,Leptospirose,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120934,1020,3,11,2597,Verminoses,Unidades,Rondônia,28,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120935,1020,3,11,2597,Cólera,Unidades,Rondônia,2,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120936,1020,3,11,2597,Difteria,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120938,1020,3,11,2597,Tifo,Unidades,Rondônia,0,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
2008,2008,120939,1020,3,11,2597,Malária,Unidades,Rondônia,8,Municípios com ocorrência de doenças associadas ao saneamento básico,Unidade da Federação
