
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://moodle.df.senac.br/faculdade/pluginfile.php/1/theme_lambda/logo/1716924091/Logo-SENAC-PNG.png" style="width: 300px; height: 150px">
</div>

# Usando Spark SQL para extrair dados de arquivos



## Importando Excel diretamente da web

Uma das formas de se importar arquivos diretamente da web, sem baixá-los para um _object storage_ é utilizando um _data frame_ e então criando um Spark DataFrame.

**Nota:** no caso de Excel, deve ser instalado o pacote _openpyxl_

Por exemplo:

`%pip install pandas openpyxl`

`import pandas as pd`

`df = pd.read_excel(caminho_do_arquivo_na_web, engine='openpyxl')`

`spark_df = spark.createDataFrame(df)`

###Atividade

- Baixe o arquivo < https://public.dhe.ibm.com/software/data/sw-library/cognos/mobile/C11/data/Weather_analytics_small.xlsx >
- Crie um Spark DataFrame
- Exiba seus dados tabulares


In [0]:

%pip install pandas openpyxl

import pandas as pd

df = pd.read_excel("https://public.dhe.ibm.com/software/data/sw-library/cognos/mobile/C11/data/Weather_analytics_small.xlsx", engine='openpyxl')

df.info()

spark_df = spark.createDataFrame(df)
display(spark_df)



##Importando JSON

Para importação de arquivos JSON, também podemos usar a biblioteca Pandas e normalizar os dados para uma estrutura tabular.

Por exemplo:

`import pandas as pd`

`df = pd.read_json(orient='records', path_or_buf="caminho_do_arquivo_na_web")`

`spark_df = spark.createDataFrame(pd.json_normalize(df.value))`

###Atividade

- Baixe o arquivo < https://olinda.bcb.gov.br/olinda/servico/Pix_DadosAbertos/versao/v1/odata/TransacoesPixPorMunicipio(DataBase=@DataBase)?@DataBase='2024-06'&$top=10&$orderby=Estado%2CMunicipio&$format=json >
    - <a href="https://dadosabertos.bcb.gov.br/dataset/pix/resource/42e0c55a-ab4e-4f9a-88f1-c5893df8d47b?inner_span=True" target="_blank">Dados abertos do Pix</a>
- Crie um Spark DataFrame
- Exiba seus dados tabulares


Importando pandas, lendo o json, normalizando os dados e imprimindo a tabela

In [0]:

import pandas as pd

df = pd.read_json(orient='records', path_or_buf="https://olinda.bcb.gov.br/olinda/servico/Pix_DadosAbertos/versao/v1/odata/TransacoesPixPorMunicipio(DataBase=@DataBase)?@DataBase='2024-06'&$top=10&$orderby=Estado%2CMunicipio&$format=json")

spark_df = spark.createDataFrame(pd.json_normalize(df.value))

display(spark_df)



AnoMes,Municipio_Ibge,Municipio,Estado_Ibge,Estado,Sigla_Regiao,Regiao,VL_PagadorPF,QT_PagadorPF,VL_PagadorPJ,QT_PagadorPJ,VL_RecebedorPF,QT_RecebedorPF,VL_RecebedorPJ,QT_RecebedorPJ
202406,1200013,ACRELÂNDIA,12,ACRE,NO,NORTE,47564325.63,240791,15838648.73,11086,50622819.89,131434,12526098.24,54618
202407,1200013,ACRELÂNDIA,12,ACRE,NO,NORTE,10430058.01,47138,4401218.06,2312,11388057.97,26159,2902289.82,11246
202407,1200054,ASSIS BRASIL,12,ACRE,NO,NORTE,4215575.59,20257,740339.75,828,3744149.27,12548,495031.68,2014
202406,1200054,ASSIS BRASIL,12,ACRE,NO,NORTE,15796750.03,101059,3038066.64,4003,15963698.35,60724,1746424.3,10412
202407,1200104,BRASILÉIA,12,ACRE,NO,NORTE,21343452.78,105319,13548486.94,7266,20341762.24,68789,12529954.35,18629
202406,1200104,BRASILÉIA,12,ACRE,NO,NORTE,87887950.2,528479,74184002.68,34578,87923015.4,345657,65636476.44,91583
202407,1200138,BUJARI,12,ACRE,NO,NORTE,5409602.66,37920,1843010.27,1585,5803729.42,21528,1469949.22,3145
202406,1200138,BUJARI,12,ACRE,NO,NORTE,22710770.56,193534,8435147.68,7974,23180994.51,112760,7603595.58,17989
202406,1200179,CAPIXABA,12,ACRE,NO,NORTE,20721198.95,162853,2853763.04,4630,20078701.22,100280,2342957.33,15689
202407,1200179,CAPIXABA,12,ACRE,NO,NORTE,5287436.82,33103,778989.4,988,5191322.98,20107,897324.67,3454



##Download de aquivos externos para o Databricks

Podemos realizar o download de arquivos externos para dentro do ambiente Databricks, salvando em volumes. Isso permite que trabalhemos com arquivos maiores e tenhamos cópia do arquivo.

Você pode realizar o download do arquivo usando o `curl` do _bash shell_ por exemplo:

`%sh`

`curl "https://dominio/arquivo" --output /tmp/arquivo.ext`

Caso o arquivo esteja compactado, você ainda pode utilizar o descompactador do sistema operacional <a href="https://docs.databricks.com/en/files/unzip-files.html" target="_blank">Unzip</a>:

> `gunzip /tmp/arquivo.ext.gz` ou `unzip /tmp/arquivo.ext.zip` 

Com o arquivo descompactado, podemos mover para um volume dentro do ambiente Databricks, utilizando o pacote **dbutils**

> `dbutils.fs.mv("file:/tmp/arquivo.ext", "dbfs:/Volumes/catalog/schema/nome_do_volume/arquivo.ext")`


###Atividade

- Baixe o arquivo < https://dumps.wikimedia.org/other/clickstream/2024-06/clickstream-ptwiki-2024-06.tsv.gz > para o sistema operacional do servidor
    - <a href="https://meta.wikimedia.org/wiki/Research:Wikipedia_clickstream" target="_blank">Wikipedia clickstream</a>
- Descompacte o arquivo utilizando o `guzip`. Note a extensão do arquivo
- Copie o arquivo para um volume do Databricks
- Execute uma consulta no arquivo usando a função `read_files`
    - <a href="https://docs.databricks.com/en/sql/language-manual/functions/read_files.html" target="_blank">read_files table-valued function</a>
    - O arquivo está no formato tsv (tabular separated values, famoso '\t') e não possui cabeçalho
    - Leia sobre a coluna <a href="https://docs.databricks.com/en/ingestion/auto-loader/schema.html#rescue" target="_blank">_rescued_data</a>
        - Você pode removê-la utilizando a opção `schemaEvolutionMode => 'none'`
        - Ou, ainda, utilizando a cláusula <a href="https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select.html" target="_blank">EXCEPT column</a>


Baixando o arquivo e unzipando/dezipando o arquivo:

In [0]:
%sh

curl "https://dumps.wikimedia.org/other/clickstream/2024-06/clickstream-ptwiki-2024-06.tsv.gz" --output /tmp/clickstream-ptwiki-2024-06.tsv.gz

gunzip /tmp/clickstream-ptwiki-2024-06.tsv.gz

ls /tmp

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
 21 21.3M   21 4719k    0     0  4329k      0  0:00:05  0:00:01  0:00:04 4326k
 42 21.3M   42 9263k    0     0  4445k      0  0:00:04  0:00:02  0:00:02 4443k
 63 21.3M   63 13.5M    0     0  4487k      0  0:00:04  0:00:03  0:00:01 4486k
 83 21.3M   83 17.9M    0     0  4488k      0  0:00:04  0:00:04 --:--:-- 4486k
100 21.3M  100 21.3M    0     0  4436k      0  0:00:04  0:00:04 --:--:-- 4535k
chauffeur-daemon-params
chauffeur-daemon.pid
chauffeur-env.sh
clean_up_local_disk.sh
clickstream-ptwiki-2024-06.tsv
cptwiki-2024-06.tsv
custom-spark.conf
dbr-consolidated-secret-conf-envvars
dbr_entry_point.py
driver-daemon-params
driver-daemon.pid
driver-env.sh
fuse_prep.sh
hsperfdata_r

Salvando no catalogo criado:

In [0]:
dbutils.fs.mv("file:/tmp/clickstream-ptwiki-2024-06.tsv", "dbfs:/Volumes/senac/default/clickstream/clickstream-ptwiki-2024-06.tsv")


Out[25]: True

Exibindo na pasta:

In [0]:
display(dbutils.fs.ls("dbfs:/Volumes/senac/default/clickstream/"))

path,name,size,modificationTime
dbfs:/Volumes/senac/default/clickstream/clickstream-ptwiki-2024-06.tsv,clickstream-ptwiki-2024-06.tsv,76762813,1720286234779


In [0]:
%sql
SELECT * FROM read_files('dbfs:/Volumes/senac/default/clickstream/clickstream-ptwiki-2024-06.tsv',
format => 'csv', 
delimiter => '\t', 
header => 'false', 
schemaEvolutionMode => 'none');

_c0,_c1,_c2,_c3
other-empty,ChatGPT,external,418039
other-empty,Cleópatra,external,360801
other-search,Campeonato_Europeu_de_Futebol,external,207236
other-search,Copa_América,external,157882
other-empty,Sony_Channel,external,155598
other-empty,Canal_Brasil,external,139828
other-search,Chrystian,external,120130
other-search,Liga_dos_Campeões_da_UEFA,external,119795
other-search,Cristiano_Ronaldo,external,113060
other-search,Chrystian_&_Ralf,external,112715
