# Operações Básicas com PySpark e Spark DataFrames

Após darmos os primeiros passos com o PySpark no Objeto de Aprendizagem anterior, vamos agora aplicar algumas técnicas básicas de manipulação de dados para nos familiarizarmos mais com a tecnologia antes de lidar com maiores volumes de dados.

Vamos prosseguir com o exemplo de disciplinas da especialização e enriquecê-lo com mais conteúdo. Ao final pedirei que vocês complementem nosso DataFrame com mais dados coletados de outras disciplinas para uma pequena atividade. Peço também um pouco de paciência com relação aos dados. No próximo Objeto de Aprendizagem passaremos a utilizar fontes de dados externas!

## Preparativos

Assim como no notebook anterior, vamos iniciar carregando o módulo `pyspark.sql`, criando uma sessão local e montando nosso DataFrame.

In [4]:
# Uso do Spark Dataframes no PySpark
from pyspark.sql import *

# Vamos trabalhar com o Spark localmente, sem o uso de um cluster.
spark = SparkSession \
    .builder \
    .master("local[4]") \
    .appName("Operações Básicas") \
    .getOrCreate()

In [5]:
# Estrutura do nosso DataFrame
Disciplina = Row("nome", "carga_horaria")

# Cada uma das disciplinas da especialização é criada como uma instância do registro Disciplina.
d01 = Disciplina("Introdução a BigData e Analytics", 36)
d02 = Disciplina("Estatística aplicada", 24)
d03 = Disciplina("Visualização de dados e informação", 24)
d04 = Disciplina("Compartilhamento e segurança de dados", 24)
d05 = Disciplina("Introdução a Python e linguagem R", 36)
d06 = Disciplina("Machine Learning", 24)
d07 = Disciplina("Processamento de Alto Desempenho e Aplicações", 24)
d08 = Disciplina("Lidando com BigData: Apache Spark, Hadoop, MapReduce, Hive", 24)
d09 = Disciplina("Gerenciamento e Processamento de grande volume de dados", 24)
d10 = Disciplina("Internet das Coisas e Aplicações Distribuídas", 24)
d11 = Disciplina("Deep Learning", 24)
d12 = Disciplina("Business Intelligence e BigData", 24)
d13 = Disciplina("Atividades Integradoras", 12)
d14 = Disciplina("Preparação para Projeto Aplicado", 36)

especializacao_bigdata_datascience = [d01, d02, d03, d04, d05, d06, d07, d08, d09, d10, d11, d12, d13, d14]

df_especializacao = spark.createDataFrame(especializacao_bigdata_datascience)

## Seleção de Colunas

Como visto no texto base, há diferentes formas de selecionar colunas de um DataFrame. Hora de aplicar!

In [7]:
display(df_especializacao)


nome,carga_horaria
Introdução a BigData e Analytics,36
Estatística aplicada,24
Visualização de dados e informação,24
Compartilhamento e segurança de dados,24
Introdução a Python e linguagem R,36
Machine Learning,24
Processamento de Alto Desempenho e Aplicações,24
"Lidando com BigData: Apache Spark, Hadoop, MapReduce, Hive",24
Gerenciamento e Processamento de grande volume de dados,24
Internet das Coisas e Aplicações Distribuídas,24


### Seleção das colunas com retorno de valor

#### Por meio de Indexação (formato Python)

As 3 formas abaixo retornarão um DataFrame somente com a coluna `nome`.

##### Diretamente pelo nome da coluna

In [11]:
#df_especializacao.show()
df_especializacao[["nome"]].show()

##### Utilizando a função `col`

In [13]:
## Mais um módulo para nossa coleção de módulos dominados!!
from pyspark.sql.functions import *

df_especializacao[[col('nome')]].show()

##### Pela notação de ponto

In [15]:
df_especializacao[[df_especializacao.nome]].show()

#### Pelo método `select` (API Spark DataFrame)

As 3 formas abaixo retornarão um DataFrame somente com a coluna carga_horaria.

In [18]:
# Diretamente pelo nome da coluna
df_especializacao.select('carga_horaria').show()

In [19]:
# Utilizando a função col
df_especializacao.select(col('carga_horaria')).show()

In [20]:
# Pela notação de ponto
df_especializacao.select(df_especializacao.carga_horaria).show()

### Referência ao objeto `Column`

Como dito no texto base, a função `col` e a notação de ponto fazem referência ao objeto `Column` do DataFrame. Este objeto é necessário para uso em operações lógicas, para diferenciar os operadores lógicos da linguagem Python dos operadores lógicos existentes no Spark DataFrames/Spark SQL. Entraremos em maiores detalhes quando estudarmos a arquitetura do Apache Spark.

In [22]:
type(df_especializacao.nome)

In [23]:
type(col('nome'))

Para fixar a diferença, vamos tentar usar o método `show` numa abordagem que retorna valor e numa abordagem que retorna o objeto `Column`:

In [25]:
# .show() em retorno de valor
df_especializacao[[df_especializacao.carga_horaria]].show(5)

In [26]:
# .show() em objeto Column. 
# Ocorre um erro do tipo TypeError alertando que não é possível chamar este método no objeto Column.

try:
    df_especializacao.carga_horaria.show(5)

except TypeError as err:
    print("Erro: {}".format(err))


## Filtros

Vimos no texto base que os filtros removem registros que não correspondem aos critérios especificados. Nada melhor que um exemplo pra mostrar na prática o que isso significa. Das nossas experimentações anteriores já sabemos que as disciplinas possuem durações de 12, 24 ou 36 horas. 

O que fazer se quisermos visualizar somente aquelas com 36 horas? Aí que entram os filtros. Basta especificarmos um critério de carga horária igual a 36 horas, como demonstrado abaixo.

In [29]:
df_especializacao[df_especializacao["carga_horaria"] == 36].show(truncate=60)

Falta só mais uma com essa carga horária 🙂

E quantas possuem duração **menor** que 36 horas?

In [31]:
df_especializacao[df_especializacao["carga_horaria"] < 36].show(truncate=60)

## Agregação

Aqui exploraremos algumas formas simples de agregação, que aprofundaremos nos materiais seguintes. Este primeiro exemplo abaixo parece bem auto-descritivo né? Significa agrupar por carga horária - criar nossos subconjuntos onde cada subconjunto concentra registros de mesma carga horária - e então contar a quantidade de registros por subconjunto.

In [34]:
df_especializacao.groupBy(df_especializacao.carga_horaria).count().show()

Uma forma diferente de agrupamento é colocar todo o DataFrame em um único subconjunto. Isso se faz necessário pois não temos como aplicar as operações de subconjuntos sem agrupamento. Abaixo vemos como calcular uma média de carga horária para toda a especialização.

In [36]:
df_especializacao.groupBy().mean('carga_horaria').show()

Para aplicar mais de uma operação de subconjuntos podemos listar todas elas dentro de uma operação de agregação (**`agg`**). Neste caso estamos tirando proveito do grupo criado para aplicar as três operações: média, mínimo e máximo. Isto é interessante pois operações de agregação em grandes volumes de dados podem ser computacionalmente custosas (e demoradas), então calcular tudo de uma vez faz mais sentido.

In [38]:
df_especializacao.groupBy().agg(mean('carga_horaria'), min('carga_horaria'), max('carga_horaria')).show()

Finalizando... vocês devem ter percebido que os nomes das colunas após agregação ficam muito extensos e são ruins para referenciar em próximas operações. No exemplo abaixo mostro como atribuir novos nomes para o resultado de cada operação de agregação por meio do método **`alias`**.

In [40]:
import pyspark.sql.functions as F

df_especializacao.groupBy() \
    .agg(mean('carga_horaria').alias('media'), \
         min('carga_horaria').alias('minimo'), \
         max('carga_horaria').alias('maximo')).show()

#### Atividade 1

Acesse a página de especializações da Unisinos [neste link](http://www.unisinos.br/pos#especializacao) e escolha uma especialização diferente desta de Big Data, Data Science e Data Analytics. Construa a sua estrutura curricular da mesma forma que fizemos no início deste Objeto de Aprendizagem até o ponto em que o DataFrame no Spark é criado. O nome do Spark DataFrame deve ser **`df_outra_especializacao`**.

Utilize o bloco de código abaixo para criação deste DataFrame:

In [42]:
# Estrutura do nosso DataFrame
Disciplina = Row("nome", "carga_horaria")

# Cada uma das disciplinas da especialização é criada como uma instância do registro Disciplina.
d01 = Disciplina("Fundamentos e Metodologia da Bioética e da Ética Aplicada", 36)
d02 = Disciplina("Princípios e Filosofia da Bioética e da Ética Aplicada", 24)
d03 = Disciplina("Bioética e Saúde", 24)
d04 = Disciplina("Bioética: Sociedade, Meio Ambiente e Animais", 24)
d05 = Disciplina("História do surgimento e do desenvolvimento da bioética no contexto da ética", 36)
d06 = Disciplina("Lógica e metodologia científica aplicadas à ética e à bioética", 24)
d07 = Disciplina("Tópicos especiais em Ética em Pesquisa", 24)
d08 = Disciplina("Ética e pesquisa qualitativa e ética em pesquisa nas humanidades", 24)
d09 = Disciplina("Melhoramento humano, Neuroética e genética", 24)
d10 = Disciplina("Princípio e responsabilidade e o problema das futuras gerações", 24)
d11 = Disciplina("Comitês de Ética em Pesquisa e o consentimento informado em pesquisa", 24)
d12 = Disciplina("Seminário Bioética Clínica", 24)
d13 = Disciplina("Atividades Integradoras", 12)
d14 = Disciplina("Seminário de Bioética e Saúde Coletiva", 31)

df_bioetica_especializacao = [d01, d02, d03, d04, d05, d06, d07, d08, d09, d10, d11, d12, d13, d14]

df_outra_especializacao = spark.createDataFrame(df_bioetica_especializacao)

df_outra_especializacao.show()


#### Atividade 2

Agora que temos dois DataFrames representando duas especializações diferentes será importante diferenciar um do outro. Vamos utilizar o nome de cada especialização para possibilitar esta diferenciação.

O código abaixo aplica uma operação de criação de nova coluna no nosso DataFrame da especialização em Big Data, Data Science e Data Analytics. Aproveitei para modificar o nome da coluna de nome da disciplina, pois agora temos dois nomes diferentes: da especialização e da disciplina.

In [44]:
# A função lit() é necessária para indicar ao Spark que o conteúdo passado para ela será o valor da coluna criada. Você deve achar isso um exagero, e concordo. Más é assim mesmo =/

df_bd_ds_da = df_especializacao \
      .withColumn("nome_especializacao", lit("Big Data, Data Science e Data Analytics")) \
      .withColumnRenamed("nome", "nome_disciplina")

df_bd_ds_da.show()

Sua atividade é criar a coluna no DataFrame da outra especialização a partir do **`df_outra_especializacao`** e aproveitar para renomear a coluna do nome da disciplina. O DataFrame resultante deve se chamar **`df_especializacao_criada`**:

In [46]:
#df_especializacao_criada = ...
df_outra_especializacao_bioetica = df_outra_especializacao \
      .withColumn("nome_especializacao", lit("Bioetica e Etica Aplicada")) \
      .withColumnRenamed("nome", "nome_disciplina")

df_outra_especializacao_bioetica.show()

#### Atividade 3

Com as duas especializações batizadas, vamos fazer algo mais interessante: criar um DataFrame que combina as duas especializações! O Spark DataFrames fornece uma operação para isso, chamada de `unionAll`.

In [48]:
df_especializacoes = df_bd_ds_da.unionAll(df_outra_especializacao_bioetica)
df_especializacoes.show()

Utilizando nosso novo DataFrame de duas especializações, responda às seguintes questões usando as operações de filtro e agregações estudadas até aqui. Não esqueça de utilizar a operação `show()` ou a função `display()` para mostrar o resultado!

1- Quantas disciplinas em cada especialização?

In [51]:
## seu código aqui:
df_especializacoes.groupBy(df_especializacoes.nome_especializacao).count().show()

2- Qual das especializações possui mais disciplinas de 36h?

In [53]:
## seu código aqui:
##df_especializacao[[col('nome')]]
df_count_esp = df_especializacoes[df_especializacoes["carga_horaria"] == 36].groupBy(df_especializacoes.nome_especializacao).count()
df_count_esp.show()
df_res = df_count_esp[['nome_especializacao','count']].groupBy().max('count')
#df_res[['max(count)']].show()

v = df_count_esp[['nome_especializacao','count']].groupBy().max('count').collect()

print(v)


3- Qual a média de carga horária de cada especialização?

In [55]:
## seu código aqui:

#df_especializacoes.show()

df_especializacoes.groupBy(df_especializacoes.nome_especializacao).agg(mean('carga_horaria')).show()

4- Existe alguma disciplina com mesmo nome nas duas especializações? Liste o nome das disciplinas que se repetirem. Mesmo que não exista repetição, o DataFrame resultante deve retornar a coluna `nome_disciplina`.

**Dica**: Use count(), filter() e select() para isso.

In [57]:
## seu código aqui:
#df_especializacoes.select('nome_disciplina').show()
df_refine = df_especializacoes.groupBy(df_especializacoes.nome_disciplina).count()
df_refine2 = df_refine[df_refine["count"] >1]
df_refine2.show()

