In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType, DateType
import sys
import os
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
#from delta.tables import *
import io
import json

In [0]:
#Spark session
def create_spark_session():
    return  SparkSession.builder\
    .config('spark.executor.memory', '8G')\
.   getOrCreate()

In [0]:
spark = create_spark_session()

O catálogo de metadados do Spark pode ser acessado pelo objeto

`SparkSession.catalog` 

As principais funcionalidades são:

* `listDatabases()`: lista todas os databases disponíveis;
* `listTables()`: lista todas as tabelas disponíveis em um determinado database;
* `listFunctions()`: lista as funções disponíveis em um determinado database;
* `refreshTable()`: atualiza os metadados de uma determinada tabela
* `uncacheTable()`: remove uma tabela salva em memória
* `clearCache()`: remove todas as tabelas salvas em memória

In [0]:
spark.catalog.listDatabases()

Out[4]: [Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='dbfs:/user/hive/warehouse')]

In [0]:
#código está solicitando ao Spark para listar as tabelas registradas no catálogo de metadados para o banco de dados 'default'.
spark.catalog.listTables('default')

Out[5]: []

### Show e Create Databases

Os databases do Spark são uma ferramenta para organizar tabelas. Eles podem e devem ser vistos como algo muito próximo dos databases de servidores de bancos de dados relacionais. O Spark utiliza por padrão um database chamado default, que serve para criar tabelas, views e realizar consultas caso o usuário não tenha definido o seu próprio. Um ponto importante é que essas estruturas persistem em diferentes sessões: se o usuário mudar de database, todas as tabelas permanecerão no database anterior e vão precisar ser consultadas de maneira diferente.

Existem alguns comandos do SQL importantes na hora de se trabalhar com databases. Else são:

* `SHOW DATABASES`: lista todas os databases disponíveis, de forma análoga ao Catalog ;
* `CREATE DATABASE <nome_do_db>`: cria um database
* `USE <nome_do_db>`: define o database como o atual para a realização de queries
    * **Obs**: ao se mudar de database, é possível acessar tabelas de um database anterior usando o prefixo “nome_do_db.” antes do nome da tabela. Exemplo:
        ```
        USE db2
        SELECT * FROM db1.table
        ```
* `SELECT current_database()`: retorna qual o database definido como o atual
* `DROP DATABASE IF EXISTS <nome_do_db>`: deleta determinado database dentre aqueles que foram definidos. Atenção: nunca delete o database default do Spark.


In [0]:
#comando para dropar um banco de dados chamado 'department'. 
spark.sql("""
DROP DATABASE IF EXISTS department;
""")

Out[6]: DataFrame[]

In [0]:
#Criando Database
spark.sql("""
CREATE DATABASE IF NOT EXISTS department;
""")

Out[7]: DataFrame[]

In [0]:
#Ainda não tem as tabelas
spark.catalog.listDatabases()

Out[8]: [Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='dbfs:/user/hive/warehouse'),
 Database(name='department', catalog='spark_catalog', description='', locationUri='dbfs:/user/hive/warehouse/department.db')]

In [0]:
spark.catalog.listTables('department')

Out[9]: []

In [0]:
path_countries = '/FileStore/transient/departments/countries'
df_countries = spark.read.format('csv')\
.option("header", True)\
.option("sep", ",")\
.option("quote","\'")\
.option("inferSchema",True)\
.load(path_countries)

In [0]:
path_regions = '/FileStore/transient/departments/regions'
df_regions = spark.read.format('csv')\
.option("header", True)\
.option("sep", ",")\
.option("quote","\'")\
.option("inferSchema",True)\
.load(path_regions)

### Tables

* **Managed Tables**: o Spark administra tanto os dados quanto os metadados das tabelas, de forma que operações como DROP TABLE afetam também os dados escritos em disco;
* **Unmanaged Tables**: o Spark administra somente os metadados da tabela, e os dados escritos em disco não são alterados em nenhum momento

In [0]:
spark.sql("""
USE department;
""")

Out[12]: DataFrame[]

In [0]:
spark.catalog.listTables('department')

Out[50]: []

**Criando Unmanaged Tables** </br>
Para criar uma tabela unmanaged (Não gerenciada) , passe o caminho do storage durante o comando write, e adicione saveAsTable('NOME_TABELA')

In [0]:
#
df_regions.write.option('path', 'dbfs:/FileStore/bronze/sql_db/departments/regions').format('parquet').mode('overwrite').saveAsTable("regions")

In [0]:
df_regions.printSchema()

root
 |-- region_id: integer (nullable = true)
 |-- region_name: string (nullable = true)



In [0]:
spark.sql("""    
    create table if not exists regions_2(
        region_id   INT,
        region_name STRING
    )using parquet options (PATH "dbfs:/FileStore/bronze/sql_db/departments/regions")
""")

Out[25]: DataFrame[]

In [0]:
spark.catalog.listTables('department')

Out[26]: [Table(name='regions', catalog='spark_catalog', namespace=['department'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='regions_2', catalog='spark_catalog', namespace=['department'], description=None, tableType='EXTERNAL', isTemporary=False)]

In [0]:
spark.sql("""
select * from regions_2 limit 5;
""").display()   

region_id,region_name
1,Europe
2,Americas
3,Asia
4,Middle East and Africa


In [0]:
#exibir tipo de tabela
type(spark.sql("""
select * from regions_2 limit 5;
"""))

Out[32]: pyspark.sql.dataframe.DataFrame

In [0]:
#Apagando as tabelas de testes regions e regions_2
spark.sql("""
DROP TABLE if EXISTS regions_2;
""")

spark.sql("""
DROP TABLE if EXISTS regions;
""")

Out[33]: DataFrame[]

In [0]:
spark.catalog.listTables('department')

Out[34]: []

In [0]:
dbutils.fs.ls('dbfs:/FileStore/bronze/sql_db/departments/regions')

Out[35]: [FileInfo(path='dbfs:/FileStore/bronze/sql_db/departments/regions/_committed_3846034413558648928', name='_committed_3846034413558648928', size=221, modificationTime=1708185213000),
 FileInfo(path='dbfs:/FileStore/bronze/sql_db/departments/regions/_committed_6159469825831452247', name='_committed_6159469825831452247', size=123, modificationTime=1708179550000),
 FileInfo(path='dbfs:/FileStore/bronze/sql_db/departments/regions/_committed_6665773785435120547', name='_committed_6665773785435120547', size=232, modificationTime=1708179909000),
 FileInfo(path='dbfs:/FileStore/bronze/sql_db/departments/regions/_committed_vacuum5076900710751031917', name='_committed_vacuum5076900710751031917', size=96, modificationTime=1708185214000),
 FileInfo(path='dbfs:/FileStore/bronze/sql_db/departments/regions/_started_3846034413558648928', name='_started_3846034413558648928', size=0, modificationTime=1708185209000),
 FileInfo(path='dbfs:/FileStore/bronze/sql_db/departments/regions/part-00000-tid-

In [0]:
spark.sql("""
select * from regions limit 5;
""").show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2380700086979364>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43m"""[39;49m
[1;32m      2[0m [38;5;124;43mselect * from regions limit 5;[39;49m
[1;32m      3[0m [38;5;124;43m"""[39;49m[43m)[49m[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39ml

**Criando Managed Tables**</br>
Para criar uma tabela managed (gerenciada) , basta omitir o caminho durante o comando saveAsTable('NOME_TABELA')

In [0]:
df_regions.write.format('parquet').mode('overwrite').saveAsTable("regions")

In [0]:
#Criando tabela gerenciada
#Não coloco o caminho
spark.sql("""    
    create table if not exists regions(
        region_id   INT,
        region_name STRING
    )using parquet;
""")

Out[38]: DataFrame[]

In [0]:
spark.sql(""" 
            insert into department.regions values
            (1,'Never Land');
        """)

Out[39]: DataFrame[]

In [0]:
%sql

-- Este é um exemplo de comando SQL
SELECT * FROM regions;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-931107377689051>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-931107377689051>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;49

In [0]:
spark.sql("""
select * from regions limit 5;
""").display()

region_id,region_name
1,Europe
2,Americas
3,Asia
4,Middle East and Africa
1,Never Land


**Detalhe sobre Managed Tables**</br>
Quando utilizamos uma Managed Tables, os dados precisam ser armazenados em algum lugar do cluster, no databricks o padrão é : </br>
**'/user/hive/warehouse/meu_database.db/minha_tabela'** </br>
Esse local pode ser modificado informado a **config('spark.sql.warehouse.dir', 'caminho_warehouse')** 

In [0]:

#Tabela gerenciada você precisa definar onde será salvo.
dbutils.fs.ls('/user/hive/warehouse/department.db/regions')

Out[53]: [FileInfo(path='dbfs:/user/hive/warehouse/department.db/regions/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1708186381000),
 FileInfo(path='dbfs:/user/hive/warehouse/department.db/regions/_committed_1966975110744847818', name='_committed_1966975110744847818', size=123, modificationTime=1708186335000),
 FileInfo(path='dbfs:/user/hive/warehouse/department.db/regions/_committed_8656414148414681796', name='_committed_8656414148414681796', size=123, modificationTime=1708186381000),
 FileInfo(path='dbfs:/user/hive/warehouse/department.db/regions/_started_1966975110744847818', name='_started_1966975110744847818', size=0, modificationTime=1708186335000),
 FileInfo(path='dbfs:/user/hive/warehouse/department.db/regions/_started_8656414148414681796', name='_started_8656414148414681796', size=0, modificationTime=1708186380000),
 FileInfo(path='dbfs:/user/hive/warehouse/department.db/regions/part-00000-tid-1966975110744847818-b22fdfb8-556a-4b46-8fbb-e77c91ad112d-16-1-c000.snappy.p

In [0]:
%sql

-- Este é um exemplo de comando SQL
SELECT * FROM regions;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-931107377689052>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-931107377689052>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;49

In [0]:
spark.sql("""
DROP TABLE regions;
""")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2380700086979372>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43m"""[39;49m
[1;32m      2[0m [38;5;124;43mDROP TABLE regions;[39;49m
[1;32m      3[0m [38;5;124;43m"""[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m    

**Criando Views**

In [0]:
#View, pode ser o resultado de uma consulta
df_countries.createOrReplaceTempView('countries_view')

In [0]:
%sql

select * from countries_view;


country_id,country_name,region_id
AR,Argentina,2
AU,Australia,3
BE,Belgium,1
BR,Brazil,2
CA,Canada,2
CH,Switzerland,1
CN,China,3
DE,Germany,1
DK,Denmark,1
EG,Egypt,4


In [0]:
spark.sql("""
select * from countries_view;
""").display()


country_id,country_name,region_id
AR,Argentina,2
AU,Australia,3
BE,Belgium,1
BR,Brazil,2
CA,Canada,2
CH,Switzerland,1
CN,China,3
DE,Germany,1
DK,Denmark,1
EG,Egypt,4


In [0]:
df_countries.createOrReplaceGlobalTempView('countries_global_view')

**Utilizando a interface SQL**

In [0]:
spark.sql("""
  DROP TABLE IF EXISTS countries ;
""")

Out[73]: DataFrame[]

In [0]:
spark.sql("""
  CREATE TABLE countries (
  country_id STRING, 
  country_name STRING,
  region_id INTEGER
)using PARQUET ;
""")

Out[74]: DataFrame[]

In [0]:
spark.sql("""
select * from countries;
""").display()

country_id,country_name,region_id


In [0]:
spark.catalog.listTables('department')

Out[65]: [Table(name='countries', catalog='spark_catalog', namespace=['department'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='countries_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

**Inserindo registros na tabela** </br>

In [0]:
spark.sql(""" 
            insert into department.countries values
            ('100','Never Land',99)
        """)

Out[79]: DataFrame[]

In [0]:
spark.sql("SELECT * FROM department.countries limit 5").display()

country_id,country_name,region_id
100,Never Land,99


In [0]:
#
spark.sql(""" 
        insert into department.countries
        select        
              country_id , 
              country_name ,       
              region_id      
        from countries_view
        """)

Out[82]: DataFrame[]

In [0]:
spark.sql("SELECT * FROM department.countries limit 50").display()

country_id,country_name,region_id
AR,Argentina,2
AU,Australia,3
BE,Belgium,1
BR,Brazil,2
CA,Canada,2
CH,Switzerland,1
CN,China,3
DE,Germany,1
DK,Denmark,1
EG,Egypt,4


In [0]:
spark.sql(""" 
        insert into department.countries values
            ('ZW','Zimbabwe',2)   
        """)

Out[84]: DataFrame[]

**Consultas, joins, agregações**</br>
Alem de criar databases, apagar databases, criar views e inserir valores em tabelas, todos os comandos referentes a manipulação de dados e consultas do SQL padrão </br>
esta disponível no spark sql, possibilitando criar agregações, joins, unions, filtros , alem de utilizar todas as funções padrões o sql para manipulação de string, numeros, etc.</br>

Cada consulta realizada no spark sql retorna um dataframe spark como resultado, sendo possivel salvar o resultado de uma consulta no datalake o apenas exibir o resultado.</br>
É importante ter em mente que o spark sql padrão ** NÃO PERMITE ** operações de update , merge , delete , remover ou atualizar valores é necessário sempre sobrescrever o 
resultado previamente escrito.



In [0]:
spark.sql(""" 
        delete from department.countries where country_id = 'ZW' ;
        """)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2380700086979388>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43m"""[39;49m[38;5;124;43m [39;49m
[1;32m      2[0m [38;5;124;43m        delete from department.countries where country_id = [39;49m[38;5;124;43m'[39;49m[38;5;124;43mZW[39;49m[38;5;124;43m'[39;49m[38;5;124;43m ;[39;49m
[1;32m      3[0m [38;5;124;43m        [39;49m[38;5;124;43m"""[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[4

In [0]:
spark.sql(""" 
        update department.countries set region_id = 2 where country_id = 'ZW' ;
        """)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2380700086979389>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43m"""[39;49m[38;5;124;43m [39;49m
[1;32m      2[0m [38;5;124;43m        update department.countries set region_id = 2 where country_id = [39;49m[38;5;124;43m'[39;49m[38;5;124;43mZW[39;49m[38;5;124;43m'[39;49m[38;5;124;43m ;[39;49m
[1;32m      3[0m [38;5;124;43m        [39;49m[38;5;124;43m"""[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43

In [0]:
df_regions.write.option('path', 'dbfs:/FileStore/bronze/sql_db/departments/regions').format('parquet').mode('overwrite').saveAsTable("department.regions")

In [0]:
spark.sql("SELECT * FROM department.regions limit 50").display()

region_id,region_name
1,Europe
2,Americas
3,Asia
4,Middle East and Africa


In [0]:
spark.sql("SELECT * FROM department.countries limit 50").display()

country_id,country_name,region_id
AR,Argentina,2
AU,Australia,3
BE,Belgium,1
BR,Brazil,2
CA,Canada,2
CH,Switzerland,1
CN,China,3
DE,Germany,1
DK,Denmark,1
EG,Egypt,4


In [0]:
spark.sql('SHOW TABLES;').display()

database,tableName,isTemporary
department,countries,False
department,regions,False
,countries_view,True


In [0]:
#inner join
spark.sql("""         
        select        
              *
        from countries a
        inner join regions b on
        a.region_id = b.region_id
        """).display()

country_id,country_name,region_id,region_id.1,region_name
AR,Argentina,2,2,Americas
AU,Australia,3,3,Asia
BE,Belgium,1,1,Europe
BR,Brazil,2,2,Americas
CA,Canada,2,2,Americas
CH,Switzerland,1,1,Europe
CN,China,3,3,Asia
DE,Germany,1,1,Europe
DK,Denmark,1,1,Europe
EG,Egypt,4,4,Middle East and Africa


In [0]:
spark.sql("""         
        select        
              count(*) as qtd_paises,
              region_name
        from countries a
        inner join regions b on
        a.region_id = b.region_id
        group by
        region_name
        """).display()

qtd_paises,region_name
16,Europe
12,Middle East and Africa
11,Americas
12,Asia
