# Utilizando o Databricks

## Criando tabela no notebook

In [0]:
dbutils.fs.rm('/user/hive/warehouse/data_csv', recurse=True)

# Para este procedimento funcionar o arquivo data.csv deve estar carregado no DBFS no endereço especificado na variável `file_location`
file_location = '/FileStore/tables/data.csv'
file_type = 'csv'
infer_schema = 'true'
first_row_is_header = 'true'
delimiter = ';'

df = spark\
    .read\
    .format(file_type)\
    .option('inferSchema', infer_schema)\
    .option('header', first_row_is_header)\
    .option('sep', delimiter)\
    .load(file_location)

table_name = 'data_csv'

df.write.format('parquet').saveAsTable(table_name)

# Databricks Utilities

## Comandos Databricks Utilities - `dbutils`

In [0]:
dbutils.help()

In [0]:
dbutils.fs.help()

## Manipulando arquivos

### Listar todos os arquivos dentro de uma pasta

In [0]:
dbutils.fs.ls('/')

Out[4]: [FileInfo(path='dbfs:/FileStore/', name='FileStore/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/', name='databricks-datasets/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-results/', name='databricks-results/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/', name='user/', size=0, modificationTime=0)]

In [0]:
for item in dbutils.fs.ls('/'):
    print(item.path)

dbfs:/FileStore/
dbfs:/databricks-datasets/
dbfs:/databricks-results/
dbfs:/user/


### Acessando os arquivos carregados no DBFS

In [0]:
dbutils.fs.ls('/FileStore/')

Out[6]: [FileInfo(path='dbfs:/FileStore/tables/', name='tables/', size=0, modificationTime=0)]

In [0]:
dbutils.fs.ls('/FileStore/tables/')

Out[7]: [FileInfo(path='dbfs:/FileStore/tables/data.csv', name='data.csv', size=2781, modificationTime=1711565743000)]

In [0]:
display(dbutils.fs.ls('/FileStore/tables/'))

path,name,size,modificationTime
dbfs:/FileStore/tables/data.csv,data.csv,2781,1711565743000


### Listando as primeiras linhas de um arquivo

In [0]:
dbutils.fs.head('/FileStore/tables/data.csv')

Out[9]: 'Nomes;Idades;Peso;Altura;IMC\r\nAaron;39;53;1.93;14.2\r\nAdam;46;100;1.58;40.1\r\nAdrian;24;110;1.57;44.6\r\nAlec;39;57;1.83;17.0\r\nAlexander;32;89;1.54;37.5\r\nAlfred;58;116;1.53;49.6\r\nAndrew;59;60;1.58;24.0\r\nAnthony;22;57;1.50;25.3\r\nArthur;20;114;1.87;32.6\r\nAsher;40;91;1.76;29.4\r\nBarnabay;25;69;1.64;25.7\r\nBenedict;37;115;1.72;38.9\r\nBenjamin;41;76;1.85;22.2\r\nBradley;42;54;1.91;14.8\r\nBrandon;45;115;1.60;44.9\r\nBrian;52;52;1.68;18.4\r\nBrice;43;81;1.60;31.6\r\nBruce;41;53;1.61;20.4\r\nCaleb;36;102;1.72;34.5\r\nCameron;60;104;1.87;29.7\r\nCharles;32;90;1.74;29.7\r\nChristopher;57;75;1.99;18.9\r\nClinton;44;61;1.95;16.0\r\nColin;37;81;1.57;32.9\r\nConnor;32;111;1.71;38.0\r\nDamian;28;116;1.96;30.2\r\nDaniel;27;103;1.88;29.1\r\nDavid;43;74;1.60;28.9\r\nDeclan;36;114;1.93;30.6\r\nDexter;57;80;1.75;26.1\r\nDominic;22;77;1.62;29.3\r\nDonald;51;75;1.66;27.2\r\nDouglas;28;93;1.99;23.5\r\nDuncan;27;99;1.78;31.2\r\nDustin;25;100;1.87;28.6\r\nDylan;39;117;1.94;31.1\r\n

### Removendo arquivos

In [0]:
dbutils.fs.rm('/FileStore/tables/data.csv')

In [0]:
dbutils.fs.ls('/FileStore/tables')

Out[10]: [FileInfo(path='dbfs:/FileStore/tables/data.csv', name='data.csv', size=2781, modificationTime=1711565743000)]

## Databricks Datasets
##### [Wine Quality Data Set](http://archive.ics.uci.edu/ml/datasets/wine+quality)

In [0]:
for item in dbutils.fs.ls('/'): print(item.path)

dbfs:/FileStore/
dbfs:/databricks-datasets/
dbfs:/databricks-results/
dbfs:/user/


In [0]:
display(dbutils.fs.ls('/databricks-datasets'))

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,0


In [0]:
display(dbutils.fs.ls('/databricks-datasets/wine-quality'))

path,name,size,modificationTime
dbfs:/databricks-datasets/wine-quality/README.md,README.md,1066,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-red.csv,winequality-red.csv,84199,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-white.csv,winequality-white.csv,264426,1594262736000


In [0]:
dbutils.fs.head("/databricks-datasets/wine-quality/README.md")



In [0]:
dbutils.fs.head("/databricks-datasets/wine-quality/winequality-red.csv")

[Truncated to first 65536 bytes]
Out[15]: '"fixed acidity";"volatile acidity";"citric acid";"residual sugar";"chlorides";"free sulfur dioxide";"total sulfur dioxide";"density";"pH";"sulphates";"alcohol";"quality"\n7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5\n7.8;0.88;0;2.6;0.098;25;67;0.9968;3.2;0.68;9.8;5\n7.8;0.76;0.04;2.3;0.092;15;54;0.997;3.26;0.65;9.8;5\n11.2;0.28;0.56;1.9;0.075;17;60;0.998;3.16;0.58;9.8;6\n7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5\n7.4;0.66;0;1.8;0.075;13;40;0.9978;3.51;0.56;9.4;5\n7.9;0.6;0.06;1.6;0.069;15;59;0.9964;3.3;0.46;9.4;5\n7.3;0.65;0;1.2;0.065;15;21;0.9946;3.39;0.47;10;7\n7.8;0.58;0.02;2;0.073;9;18;0.9968;3.36;0.57;9.5;7\n7.5;0.5;0.36;6.1;0.071;17;102;0.9978;3.35;0.8;10.5;5\n6.7;0.58;0.08;1.8;0.097;15;65;0.9959;3.28;0.54;9.2;5\n7.5;0.5;0.36;6.1;0.071;17;102;0.9978;3.35;0.8;10.5;5\n5.6;0.615;0;1.6;0.089;16;59;0.9943;3.58;0.52;9.9;5\n7.8;0.61;0.29;1.6;0.114;9;29;0.9974;3.26;1.56;9.1;5\n8.9;0.62;0.18;3.8;0.176;52;145;0.9986;3.16;0.88;9.2;5\n8.9

In [0]:
dbutils.fs.head("/databricks-datasets/wine-quality/winequality-white.csv")

[Truncated to first 65536 bytes]
Out[16]: '"fixed acidity";"volatile acidity";"citric acid";"residual sugar";"chlorides";"free sulfur dioxide";"total sulfur dioxide";"density";"pH";"sulphates";"alcohol";"quality"\n7;0.27;0.36;20.7;0.045;45;170;1.001;3;0.45;8.8;6\n6.3;0.3;0.34;1.6;0.049;14;132;0.994;3.3;0.49;9.5;6\n8.1;0.28;0.4;6.9;0.05;30;97;0.9951;3.26;0.44;10.1;6\n7.2;0.23;0.32;8.5;0.058;47;186;0.9956;3.19;0.4;9.9;6\n7.2;0.23;0.32;8.5;0.058;47;186;0.9956;3.19;0.4;9.9;6\n8.1;0.28;0.4;6.9;0.05;30;97;0.9951;3.26;0.44;10.1;6\n6.2;0.32;0.16;7;0.045;30;136;0.9949;3.18;0.47;9.6;6\n7;0.27;0.36;20.7;0.045;45;170;1.001;3;0.45;8.8;6\n6.3;0.3;0.34;1.6;0.049;14;132;0.994;3.3;0.49;9.5;6\n8.1;0.22;0.43;1.5;0.044;28;129;0.9938;3.22;0.45;11;6\n8.1;0.27;0.41;1.45;0.033;11;63;0.9908;2.99;0.56;12;5\n8.6;0.23;0.4;4.2;0.035;17;109;0.9947;3.14;0.53;9.7;5\n7.9;0.18;0.37;1.2;0.04;16;75;0.992;3.18;0.63;10.8;5\n6.6;0.16;0.4;1.5;0.044;48;143;0.9912;3.54;0.52;12.4;7\n8.3;0.42;0.62;19.25;0.04;41;172;1.0002;2.98;0

## Diretórios e arquivos

In [0]:
dbutils.fs.ls('/FileStore/tables')

Out[17]: [FileInfo(path='dbfs:/FileStore/tables/data.csv', name='data.csv', size=2781, modificationTime=1711565743000)]

In [0]:
dbutils.fs.mkdirs('/FileStore/tables/aula-databricks/vinhos')

Out[18]: True

In [0]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks'))

path,name,size,modificationTime
dbfs:/FileStore/tables/aula-databricks/vinhos/,vinhos/,0,0


In [0]:
dbutils.fs.ls('/FileStore/tables/aula-databricks/vinhos')

Out[20]: []

In [0]:
dbutils.fs.help('cp')

In [0]:
dbutils.fs.cp(
    '/databricks-datasets/wine-quality', 
    '/FileStore/tables/aula-databricks', 
    recurse=True
)

Out[22]: True

In [0]:
dbutils.fs.ls('/FileStore/tables/aula-databricks/vinhos')

Out[23]: []

In [0]:
dbutils.fs.ls('/FileStore/tables/aula-databricks')

Out[24]: [FileInfo(path='dbfs:/FileStore/tables/aula-databricks/README.md', name='README.md', size=1066, modificationTime=1711640303000),
 FileInfo(path='dbfs:/FileStore/tables/aula-databricks/vinhos/', name='vinhos/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/aula-databricks/winequality-red.csv', name='winequality-red.csv', size=84199, modificationTime=1711640304000),
 FileInfo(path='dbfs:/FileStore/tables/aula-databricks/winequality-white.csv', name='winequality-white.csv', size=264426, modificationTime=1711640304000)]

In [0]:
dbutils.fs.help('mv')

In [0]:
dbutils.fs.mv(
    '/FileStore/tables/aula-databricks/', 
    '/FileStore/tables/aula-databricks/vinhos/', 
    recurse=True
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-2521796350613778>:1[0m
[0;32m----> 1[0m [43mdbutils[49m[38;5;241;43m.[39;49m[43mfs[49m[38;5;241;43m.[39;49m[43mmv[49m[43m([49m
[1;32m      2[0m [43m    [49m[38;5;124;43m'[39;49m[38;5;124;43m/FileStore/tables/aula-databricks/[39;49m[38;5;124;43m'[39;49m[43m,[49m[43m [49m
[1;32m      3[0m [43m    [49m[38;5;124;43m'[39;49m[38;5;124;43m/FileStore/tables/aula-databricks/vinhos/[39;49m[38;5;124;43m'[39;49m[43m,[49m[43m [49m
[1;32m      4[0m [43m    [49m[43mrecurse[49m[38;5;241;43m=[39;49m[38;5;28;43;01mTrue[39;49;00m
[1;32m      5[0m [43m)[49m

File [0;32m/databricks/python_shell/dbruntime/dbutils.py:346[0m, in [0;36mDBUtils.FSHandler.prettify_exception_message.<locals>.f_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    344[0m

In [0]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks'))

In [0]:
for item in dbutils.fs.ls('/FileStore/tables/aula-databricks'):
    if item.size!=0:
        dbutils.fs.mv(
          f'/FileStore/tables/aula-databricks/{item.name}', 
          '/FileStore/tables/aula-databricks/vinhos/'
        )

In [0]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks'))

path,name,size,modificationTime
dbfs:/FileStore/tables/aula-databricks/vinhos/,vinhos/,0,0


In [0]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks/vinhos'))

path,name,size,modificationTime
dbfs:/FileStore/tables/aula-databricks/vinhos/README.md,README.md,1066,1711640581000
dbfs:/FileStore/tables/aula-databricks/vinhos/winequality-red.csv,winequality-red.csv,84199,1711640582000
dbfs:/FileStore/tables/aula-databricks/vinhos/winequality-white.csv,winequality-white.csv,264426,1711640583000


# Usando SQL no Databricks

## Visualizando databases

In [0]:
%sql
SHOW DATABASES

databaseName
default


### Criando um database

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS teste

In [0]:
%sql
SHOW DATABASES

databaseName
default
teste


### Criando uma tabela

In [0]:
%sql
USE teste

In [0]:
%sql
CREATE TABLE usuarios(
  idade int,
  estado string,
  salario float
)
  ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
  STORED AS textfile
  LOCATION '/FileStore/tables/aula-databricks/usuarios/'

In [0]:
%sql
SHOW TABLES

database,tableName,isTemporary
teste,usuarios,False


In [0]:
%sql
SELECT *
  FROM usuarios

idade,estado,salario


### Inserindo registros em uma tabela

In [0]:
%sql
INSERT INTO usuarios VALUES (25, 'SP', 5000)

In [0]:
%sql
SELECT *
  FROM usuarios

idade,estado,salario
25,SP,5000.0


## Partições

Existem duas maneiras de inserir dados na tabela de partição:

**Estático:** precisamos especificar o valor da coluna de partição em cada instrução que será carregada.

> `PARTITION(country="BR")`

**Dinâmico:** Não precisamos especificar o valor da coluna da partição.

> `PARTITION(country)`

In [0]:
%sql
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;

key,value
hive.exec.dynamic.partition.mode,nonstrict


In [0]:
%sql
CREATE TABLE usuariosPart(
  idade int,
  estado string,
  salario float
)
  ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
  STORED AS textfile
  PARTITIONED BY (ano int)
  LOCATION '/FileStore/tables/aula-databricks/usuariosPart/'

In [0]:
%sql
INSERT INTO usuariosPart VALUES (25, 'SP', 5000, 2021)

In [0]:
%sql
SELECT *
  FROM usuariosPart

idade,estado,salario,ano
25,SP,5000.0,2021


In [0]:
%sql
INSERT INTO usuariosPart 
  PARTITION (ano=2020)
    VALUES (30, 'SP', 6000)

In [0]:
%sql
SELECT *
  FROM usuariosPart

idade,estado,salario,ano
30,SP,6000.0,2020
25,SP,5000.0,2021


In [0]:
%sql
SELECT * 
  FROM usuariosPart
    WHERE ano=2020

## Carregando dados

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS tabela_vinhos

In [0]:
%sql
USE tabela_vinhos

In [0]:
dbutils.fs.head('/FileStore/tables/aula-databricks/vinhos/winequality-red.csv')

In [0]:
%sql
CREATE TABLE red_wine(
    fixed_acidity float,
    volatile_acidity float,
    citric_acid float,
    residual_sugar float,
    chlorides float,
    free_sulfur_dioxide int,
    total_sulfur_dioxide float,
    density float,
    pH float,
    sulphates float,
    alcohol float,
    quality float
    )
      USING CSV
        OPTIONS (
            path '/FileStore/tables/aula-databricks/vinhos/winequality-red.csv',
            header 'true',
            delimiter ';'
        )

In [0]:
%sql
SHOW TABLES

In [0]:
%sql
SELECT *
  FROM red_wine
    LIMIT 10

### Atividade - Faça como eu fiz

Repita o mesmo procedimento feito para o arquivo de vinho tinto com o arquivo de vinho branco
- Chame a tabela de white_wine

In [0]:
dbutils.fs.head('/FileStore/tables/aula-databricks/vinhos/winequality-white.csv')

In [0]:
%sql
CREATE TABLE white_wine(
    fixed_acidity float,
    volatile_acidity float,
    citric_acid float,
    residual_sugar float,
    chlorides float,
    free_sulfur_dioxide int,
    total_sulfur_dioxide float,
    density float,
    pH float,
    sulphates float,
    alcohol float,
    quality float
    )
    USING CSV
        OPTIONS (
            path '/FileStore/tables/aula-databricks/vinhos/winequality-white.csv',
            header 'true',
            delimiter ';'
        )

In [0]:
%sql
SHOW TABLES

In [0]:
%sql
SELECT *
  FROM white_wine
    LIMIT 10

## Explorando os dados

In [0]:
%sql
DESCRIBE red_wine

In [0]:
%sql
SELECT DISTINCT (quality)
  FROM red_wine
    ORDER BY quality DESC

In [0]:
%sql
SELECT quality, COUNT (quality) AS freq
  FROM red_wine
    GROUP BY quality
      ORDER BY quality DESC

In [0]:
%sql
SELECT quality, MIN (pH) AS pH_mimino, MAX (pH) AS pH_maximo    
  FROM red_wine
    GROUP BY quality
      ORDER BY quality DESC

### Atividade - Faça como eu fiz

Faz as mesmas análise com os dados de vinho branco e compare os resultados obtidos.

In [0]:
%sql
DESCRIBE white_wine

In [0]:
%sql
SELECT DISTINCT (quality)
  FROM white_wine
    ORDER BY quality DESC

In [0]:
%sql
SELECT quality, COUNT (quality) AS freq
  FROM white_wine
    GROUP BY quality
      ORDER BY quality DESC

In [0]:
%sql
SELECT quality, MIN (pH) AS pH_mimino, MAX (pH) AS pH_maximo    
  FROM white_wine
    GROUP BY quality
      ORDER BY quality DESC

## Juntando os dados

In [0]:
%sql
CREATE OR REPLACE TABLE new_red_wine 
  AS SELECT *, 'red' AS wine_type
    FROM red_wine

In [0]:
%sql
SELECT *
  FROM new_red_wine

In [0]:
%sql
CREATE OR REPLACE TABLE new_white_wine 
  AS select *, 'white' AS wine_type
    FROM white_wine

In [0]:
%sql
SELECT *
  FROM new_white_wine

In [0]:
%sql
CREATE OR REPLACE TABLE combined_wines 
  AS SELECT *
    FROM new_red_wine
      UNION ALL SELECT *
        FROM new_white_wine

In [0]:
%sql
SELECT wine_type, AVG (pH) AS pH_medio    
  FROM combined_wines
    GROUP BY wine_type
      ORDER BY wine_type

# Apache Spark

In [0]:
spark

## Comunicação Hive-Spark

In [0]:
%sql
SHOW TABLES

In [0]:
tabela = spark.table('tabela_vinhos.combined_wines')

In [0]:
tabela

In [0]:
tabela.show()

In [0]:
display(tabela)

### SQL com Spark

```spark.sql('query').show()```

ou 

```display(spark.sql('query'))```

Se quisermos pular linhas na query temos que utilizar 3 aspas simples:
```
spark.sql( '''
  query
''' ).show()
```

In [0]:
spark.sql('''
    SELECT DISTINCT (quality)
        FROM combined_wines
            ORDER BY quality DESC 
''').show()

In [0]:
spark.sql('SELECT AVG (pH) FROM combined_wines').show()

### Registrando uma tabela

In [0]:
resultado = spark.sql(''' 
    SELECT *
        FROM combined_wines
            WHERE pH < 3
''')

In [0]:
type(resultado)

In [0]:
resultado.createOrReplaceTempView('nova_tabela')

In [0]:
spark.sql('''
    SELECT quality, COUNT (quality) AS Freq
        FROM nova_tabela
            GROUP BY quality
''').show()

## PySpark

In [0]:
import pyspark
from pyspark.sql.functions import lit

In [0]:
display(dbutils.fs.ls('/databricks-datasets/wine-quality/'))

In [0]:
red_wine_df = spark.read.format('csv')\
    .option('inferSchema', 'true')\
    .option('sep', ';')\
    .option('header', 'true')\
    .load('/databricks-datasets/wine-quality/winequality-red.csv')

display(red_wine_df)

In [0]:
type(red_wine_df)

In [0]:
white_wine_df = (spark.read.format('csv')
    .option('inferSchema', 'true')
    .option('sep', ';')
    .option('header', 'true')
    .load('/databricks-datasets/wine-quality/winequality-white.csv')
)

display(white_wine_df)

In [0]:
red_wine_df = red_wine_df.withColumn('wine_type', lit('red'))
red_wine_df.show()

In [0]:
white_wine_df = white_wine_df.withColumn('wine_type', lit('white'))
white_wine_df.show()

In [0]:
combined_wines = red_wine_df.union(white_wine_df)
display(combined_wines)

In [0]:
combined_wines = combined_wines.withColumnRenamed('quality', 'nota')
display(combined_wines)

In [0]:
(
    combined_wines
        .select(['nota', 'wine_type'])
        .show()
)

In [0]:
(
    combined_wines
        .groupBy(['nota', 'wine_type'])
        .count()
        .show()
)

In [0]:
combined_wines.printSchema()

In [0]:
(
    combined_wines
        .write
        .option('header', True)
        .mode('overwrite')
        .csv('/FileStore/tables/aula-databricks/vinhos/pyspark')
)