# Utilizando o Databricks

## Criando tabela no notebook

In [None]:
dbutils.fs.rm('/user/hive/warehouse/data_csv', recurse=True)

# Para este procedimento funcionar o arquivo data.csv deve estar carregado no DBFS no endereço especificado na variável `file_location`
file_location = '/FileStore/tables/data.csv'
file_type = 'csv'
infer_schema = 'true'
first_row_is_header = 'true'
delimiter = ';'

df = spark\
    .read\
    .format(file_type)\
    .option('inferSchema', infer_schema)\
    .option('header', first_row_is_header)\
    .option('sep', delimiter)\
    .load(file_location)

table_name = 'data_csv'

df.write.format('parquet').saveAsTable(table_name)

# Databricks Utilities

## Comandos Databricks Utilities - `dbutils`

In [None]:
dbutils.help()

In [None]:
dbutils.fs.help()

## Manipulando arquivos

### Listar todos os arquivos dentro de uma pasta

In [None]:
dbutils.fs.ls('/')

In [None]:
for item in dbutils.fs.ls('/'):
    print(item.path)

### Acessando os arquivos carregados no DBFS

In [None]:
dbutils.fs.ls('/FileStore/')

In [None]:
dbutils.fs.ls('/FileStore/tables/')

In [None]:
display(dbutils.fs.ls('/FileStore/tables/'))

### Listando as primeiras linhas de um arquivo

In [None]:
dbutils.fs.head('/FileStore/tables/data.csv')

### Removendo arquivos

In [None]:
dbutils.fs.rm('/FileStore/tables/data.csv')

In [None]:
dbutils.fs.ls('/FileStore/tables')

## Databricks Datasets
##### [Wine Quality Data Set](http://archive.ics.uci.edu/ml/datasets/wine+quality)

In [None]:
for item in dbutils.fs.ls('/'): print(item.path)

In [None]:
display(dbutils.fs.ls('/databricks-datasets'))

In [None]:
display(dbutils.fs.ls('/databricks-datasets/wine-quality'))

In [None]:
dbutils.fs.head("/databricks-datasets/wine-quality/README.md")

In [None]:
dbutils.fs.head("/databricks-datasets/wine-quality/winequality-red.csv")

In [None]:
dbutils.fs.head("/databricks-datasets/wine-quality/winequality-white.csv")

## Diretórios e arquivos

In [None]:
dbutils.fs.ls('/FileStore/tables')

In [None]:
dbutils.fs.mkdirs('/FileStore/tables/aula-databricks/vinhos')

In [None]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks'))

In [None]:
dbutils.fs.ls('/FileStore/tables/aula-databricks/vinhos')

In [None]:
dbutils.fs.help('cp')

In [None]:
dbutils.fs.cp(
    '/databricks-datasets/wine-quality',
    '/FileStore/tables/aula-databricks',
    recurse=True
)

In [None]:
dbutils.fs.ls('/FileStore/tables/aula-databricks/vinhos')

In [None]:
dbutils.fs.ls('/FileStore/tables/aula-databricks')

In [None]:
dbutils.fs.help('mv')

In [None]:
# dbutils.fs.mv(
#     '/FileStore/tables/aula-databricks/',
#     '/FileStore/tables/aula-databricks/vinhos/',
#     recurse=True
# )

In [None]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks'))

In [None]:
for item in dbutils.fs.ls('/FileStore/tables/aula-databricks'):
    if item.size!=0:
        dbutils.fs.mv(
          f'/FileStore/tables/aula-databricks/{item.name}',
          '/FileStore/tables/aula-databricks/vinhos/'
        )

In [None]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks'))

In [None]:
display(dbutils.fs.ls('/FileStore/tables/aula-databricks/vinhos'))

# Usando SQL no Databricks

## Criando uma tabela

In [None]:
%sql
SHOW DATABASES

### Criando um database

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS teste

In [None]:
%sql
SHOW DATABASES

### Criando uma tabela

In [None]:
%sql
USE teste

In [None]:
%sql
CREATE TABLE usuarios(
  idade int,
  estado string,
  salario float
)
  ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
  STORED AS textfile
  LOCATION '/FileStore/tables/aula-databricks/usuarios/'

In [None]:
%sql
SHOW TABLES

In [None]:
%sql
SELECT *
  FROM usuarios

### Inserindo registros em uma tabela

In [None]:
%sql
INSERT INTO usuarios VALUES (25, 'SP', 5000)

In [None]:
%sql
SELECT *
  FROM usuarios

## Partições

Existem duas maneiras de inserir dados na tabela de partição:

**Estático:** precisamos especificar o valor da coluna de partição em cada instrução que será carregada.

> `PARTITION(country="BR")`

**Dinâmico:** Não precisamos especificar o valor da coluna da partição.

> `PARTITION(country)`

In [None]:
%sql
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;

In [None]:
%sql
CREATE TABLE usuariosPart(
  idade int,
  estado string,
  salario float
)
  ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
  STORED AS textfile
  PARTITIONED BY (ano int)
  LOCATION '/FileStore/tables/aula-databricks/usuariosPart/'

In [None]:
%sql
INSERT INTO usuariosPart VALUES (25, 'SP', 5000, 2021)

In [None]:
%sql
SELECT *
  FROM usuariosPart

In [None]:
%sql
INSERT INTO usuariosPart
  PARTITION (ano=2020)
    VALUES (30, 'SP', 6000)

In [None]:
%sql
SELECT *
  FROM usuariosPart

In [None]:
%sql
SELECT *
  FROM usuariosPart
    WHERE ano=2020

## Carregando dados

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS tabela_vinhos

In [None]:
%sql
USE tabela_vinhos

In [None]:
dbutils.fs.head('/FileStore/tables/aula-databricks/vinhos/winequality-red.csv')

In [None]:
%sql
CREATE TABLE red_wine(
    fixed_acidity float,
    volatile_acidity float,
    citric_acid float,
    residual_sugar float,
    chlorides float,
    free_sulfur_dioxide int,
    total_sulfur_dioxide float,
    density float,
    pH float,
    sulphates float,
    alcohol float,
    quality float
    )
      USING CSV
        OPTIONS (
            path '/FileStore/tables/aula-databricks/vinhos/winequality-red.csv',
            header 'true',
            delimiter ';'
        )

In [None]:
%sql
SHOW TABLES

In [None]:
%sql
SELECT *
  FROM red_wine
    LIMIT 10

### Atividade - Faça como eu fiz

Repita o mesmo procedimento feito para o arquivo de vinho tinto com o arquivo de vinho branco
- Chame a tabela de white_wine

In [None]:
dbutils.fs.head('/FileStore/tables/aula-databricks/vinhos/winequality-white.csv')

In [None]:
%sql
CREATE TABLE white_wine(
    fixed_acidity float,
    volatile_acidity float,
    citric_acid float,
    residual_sugar float,
    chlorides float,
    free_sulfur_dioxide int,
    total_sulfur_dioxide float,
    density float,
    pH float,
    sulphates float,
    alcohol float,
    quality float
    )
    USING CSV
        OPTIONS (
            path '/FileStore/tables/aula-databricks/vinhos/winequality-white.csv',
            header 'true',
            delimiter ';'
        )

In [None]:
%sql
SHOW TABLES

In [None]:
%sql
SELECT *
  FROM white_wine
    LIMIT 10

## Explorando os dados

In [None]:
%sql
DESCRIBE red_wine

In [None]:
%sql
SELECT DISTINCT (quality)
  FROM red_wine
    ORDER BY quality DESC

In [None]:
%sql
SELECT quality, COUNT (quality) AS freq
  FROM red_wine
    GROUP BY quality
      ORDER BY quality DESC

In [None]:
%sql
SELECT quality, MIN (pH) AS pH_mimino, MAX (pH) AS pH_maximo
  FROM red_wine
    GROUP BY quality
      ORDER BY quality DESC

### Atividade - Faça como eu fiz

Faz as mesmas análise com os dados de vinho branco e compare os resultados obtidos.

In [None]:
%sql
DESCRIBE white_wine

In [None]:
%sql
SELECT DISTINCT (quality)
  FROM white_wine
    ORDER BY quality DESC

In [None]:
%sql
SELECT quality, COUNT (quality) AS freq
  FROM white_wine
    GROUP BY quality
      ORDER BY quality DESC

In [None]:
%sql
SELECT quality, MIN (pH) AS pH_mimino, MAX (pH) AS pH_maximo
  FROM white_wine
    GROUP BY quality
      ORDER BY quality DESC

## Juntando os dados

In [None]:
%sql
CREATE OR REPLACE TABLE new_red_wine
  AS SELECT *, 'red' AS wine_type
    FROM red_wine

In [None]:
%sql
SELECT *
  FROM new_red_wine

In [None]:
%sql
CREATE OR REPLACE TABLE new_white_wine
  AS select *, 'white' AS wine_type
    FROM white_wine

In [None]:
%sql
SELECT *
  FROM new_white_wine

In [None]:
%sql
CREATE OR REPLACE TABLE combined_wines
  AS SELECT *
    FROM new_red_wine
      UNION ALL SELECT *
        FROM new_white_wine

In [None]:
%sql
SELECT wine_type, AVG (pH) AS pH_medio
  FROM combined_wines
    GROUP BY wine_type
      ORDER BY wine_type

# Apache Spark

In [None]:
spark

## Comunicação Hive-Spark

In [None]:
%sql
SHOW TABLES

In [None]:
tabela = spark.table('tabela_vinhos.combined_wines')

In [None]:
tabela

In [None]:
tabela.show()

In [None]:
display(tabela)

### SQL com Spark

```spark.sql('query').show()```

ou

```display(spark.sql('query'))```

Se quisermos pular linhas na query temos que utilizar 3 aspas simples:
```
spark.sql( '''
  query
''' ).show()
```

In [None]:
spark.sql('''
    SELECT DISTINCT (quality)
        FROM combined_wines
            ORDER BY quality DESC
''').show()

In [None]:
spark.sql('SELECT AVG (pH) FROM combined_wines').show()

### Registrando uma tabela

In [None]:
resultado = spark.sql('''
    SELECT *
        FROM combined_wines
            WHERE pH < 3
''')

In [None]:
type(resultado)

In [None]:
resultado.createOrReplaceTempView('nova_tabela')

In [None]:
spark.sql('''
    SELECT quality, COUNT (quality) AS Freq
        FROM nova_tabela
            GROUP BY quality
''').show()

## PySpark

In [None]:
import pyspark
from pyspark.sql.functions import lit

In [None]:
display(dbutils.fs.ls('/databricks-datasets/wine-quality/'))

In [None]:
red_wine_df = spark.read.format('csv')\
    .option('inferSchema', 'true')\
    .option('sep', ';')\
    .option('header', 'true')\
    .load('/databricks-datasets/wine-quality/winequality-red.csv')

display(red_wine_df)

In [None]:
type(red_wine_df)

In [None]:
white_wine_df = (spark.read.format('csv')
    .option('inferSchema', 'true')
    .option('sep', ';')
    .option('header', 'true')
    .load('/databricks-datasets/wine-quality/winequality-white.csv')
)

display(white_wine_df)

In [None]:
red_wine_df = red_wine_df.withColumn('wine_type', lit('red'))
red_wine_df.show()

In [None]:
white_wine_df = white_wine_df.withColumn('wine_type', lit('white'))
white_wine_df.show()

In [None]:
combined_wines = red_wine_df.union(white_wine_df)
display(combined_wines)

In [None]:
combined_wines = combined_wines.withColumnRenamed('quality', 'nota')
display(combined_wines)

In [None]:
(
    combined_wines
        .select(['nota', 'wine_type'])
        .show()
)

In [None]:
(
    combined_wines
        .groupBy(['nota', 'wine_type'])
        .count()
        .show()
)

In [None]:
combined_wines.printSchema()

In [None]:
(
    combined_wines
        .write
        .option('header', True)
        .mode('overwrite')
        .csv('/FileStore/tables/aula-databricks/vinhos/pyspark')
)