### # Databricks notebook source

In [None]:
df = spark.read.json("/Volumes/workspace/default/arquivos-aula/Anac/V_OCORRENCIA_AMPLA.json")
display(df)


orders_df = spark.read.\
    format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/Volumes/workspace/default/arquivos-aula/Bike Store/orders.csv")
display(orders_df)

### # Verificar os tipos de dados aceitos nas colunas.

In [None]:
df.printSchema() # nullable = 'true' significa que a coluna aceita valores nulos.

### # Ver nome das colunas (forma simples) na vertical

df.columns

### # Outra forma de ver as columas (em loop)

for Loop in df.columns:
    print(Loop)

### # Selecionar apenas algumas colunas do Data Frame


display(df['Tipo_de_Ocorrencia', 'UF'])

### # Criando uma variável com as colunas (lista)


In [None]:
ColunasSelecionadas = [
    'Categoria_da_Aeronave',
    'Classificacao_da_Ocorrência',
    'Danos_a_Aeronave',
    'Data_da_Ocorrencia',
    'Descricao_do_Tipo',
    'Fase_da_Operacao'
]

display(df[ColunasSelecionadas])


# Também daria "no mesmo" fazer assim:

#  ColunasSelecionadas = df['Categoria_da_Aeronave',
#    'Classificacao_da_Ocorrência',
#    'Danos_a_Aeronave',
#    'Data_da_Ocorrencia',
#    'Descricao_do_Tipo',
#    'Fase_da_Operacao'
#    ]

#  display(ColunasSelecionadas)

### # Método select (para selecionar colunas)

display(df.select('Danos_a_Aeronave', 'Data_da_Ocorrencia', 'Operador', 'UF'))

### # Criando novas colunas


In [None]:
# Fazendo a leitura do arquivo JSON
nDF = spark.read.json("/Volumes/workspace/default/arquivos-aula/Anac/V_OCORRENCIA_AMPLA.json")
display(nDF)

### # Criando nova coluna no arquivo JSON

teste = nDF['Classificacao_da_Ocorrência', 'UF', 'Municipio']

from pyspark.sql.functions import concat, lit

display(teste.withColumn('Municipio - UF', concat(nDF.Municipio, lit(" - "), nDF["UF"])))

# withColumn -> Cria uma coluna (caso não exista) ou substitui uma coluna (caso exista) / Nome da coluna.

# concat -> Concatena os valores das colunas (junta o valor filtrado 'nDF.Municipio' com o valor do 'lit' e com o outro valor filtrado 'nDF["UF"]').

# lit -> Insere um valor 'literal' nas colunas. Por exemplo, se eu alterar o ' - ' para 'city of', entre a cidade e o estado, resultará em 'Curitiba city of PR', sendo replicado para todos os valores das células na coluna recém criada.


### # Salvando o tratamento acima em um novo Data Frame


In [None]:
from pyspark.sql.functions import concat, lit

dfTratado = teste.withColumn("Municipio - UF", concat(nDF.Municipio, lit(" - "), nDF["UF"]))

display(dfTratado)

### # Inserindo mais de uma coluna no mesmo comando e deixando tudo em letra minúscula

from pyspark.sql.functions import concat, lit, lower, upper

novaDfTratada = teste\
    .withColumn("Municipio - UF", concat(nDF.Municipio, lit(" - "), nDF["UF"]))\
    .withColumn("País", lit("Brasil"))\
    .withColumn("minúsculo", lower(teste.Municipio))

# .withColumn("Municipio - UF", concat(nDF.Municipio, lit(" - "), nDF["UF]") -> Cria uma nova coluna chamada 'Municipio - UF' e concatena os valores das colunas 'Municipio' e 'UF' inserindo o literal ' - ' (que pode ser substituído por qualquer outro nome).

# .withColumn("País", lit("Brasil")) -> Cria uma nova coluna chamada país e insere o valor Brasil para todas as células (linhas).

# .withColumn("minúsculo"), lower(teste.Municipio) -> Cria uma nova coluna chamada 'minúsculo' e converte todos os valores da coluna para minúsculo.

display(novaDfTratada)

### # Criando uma nova coluna chamada 'Regiao' e populando-a de acordo com as células da coluna 'UF'. Caso UF seja igual 'PR' as células na coluna Região receberão o valor 'Sul', senão (se a coluna UF for diferente de 'PR') receberão o valor 'Outra região do Brasil.'


In [None]:
from pyspark.sql.functions import when

# When -> Quando (semelhante ao 'if' do Python).
# Otherwise -> Senão (semelhante ao 'else' do Python).


Regiao = novaDfTratada.withColumn("Regiao", when(novaDfTratada.UF == "PR", "Sul").otherwise("Outra região do Brasil"))
display(Regiao)

### # Incluíndo mais de uma condição (mais de uma comparação de valor):

from pyspark.sql.functions import when, col

# ISIN (isin) -> Verifica se o valor é um dos valores passados como parâmetro e retorna outro valor. Exemplo: Verifica se na coluna UF possue o valor PR, SC ou RS. Se sim (se corresponde com um dos valores informados como parâmetro, será incluso na nova coluna chamada região o valor 'Região Sul do Brasil'. Senão (caso não corresponda com um dos valores informados como parâmetro) retorna 'Outras regiões do Brasil' por conta do 'otherwise'. A difereça está na comparação simultânea com mais de um único valor. Ao invés de somente 'UF == PR', com 'isin' é possível verificar a correspondência com outros valores simultaneamente.

Regiao = Regiao.withColumn("Regiao",\
     when(col("UF").isin("PR", "SC", "RS"), "Sul")
    .when(col("UF").isin("SP", "RJ", "MG", "ES"), "Sudeste")
    .otherwise("Outras regiões do Brasil")
)

display(Regiao)