# Capítulo 5 - Operações estruturadas - Parte 2
<br>
<div style="text-align: justify">Por definição, um DataFrame consiste em uma série de registros (como linhas em uma tabela), que são do tipo Linha e um número de colunas (como colunas em uma planilha) que representam uma expressão de cálculo que pode ser executada em cada registro individual do conjunto de dados. Os esquemas definem o nome e o tipo de dados em cada coluna. O particionamento do DataFrame define o layout da distribuição física do DataFrame ou do Conjunto de Dados no cluster. O esquema de particionamento define como isso é alocado. Você pode definir isso para ser baseado em valores em uma determinada coluna ou não-deterministicamente.</div>

### Sumário
 1. __Maiúsculas e minúsculas__ 
 2. __drop()__ - Removendo colunas
 3. __cast()__ - Mudando o tipo da coluna
 4. __filter() / where()__ - Filtrando linhas
 5. __distinc()__ - Acessando valores não duplicados
 6. __sample()__ - Amostras aleatórias
 7. __randomSplit()__ - Divisões aleatórias
 8. __union()__ - Concatenando e adicionando Linhas
 9. __sort() / orderBy()__ - Ordenando Linhas
 10. __limit()__ - Limitando extração
 11. __repartition() / coalesce()__ - Repartição e coalesce
 12. __collect() / take() / show() / toLocalIterator()__ - Coletando dados para o Driver

In [11]:
# Abrindo um Dataframe de trabalho e importando bilbiotecas necessárias
path = "file:///root/sparkcurso/Spark_Definitive_Guide/data/flight-data/json/2015-summary.json"
df = spark.read.format("json").load(path)
import pyspark.sql.functions as F

### Maiúsculas e minúsculas

Por padrão, o Spark não é sensível a maiúsculas e minúsculas. Para torná-lo sensível, deve-se mudar o seguinte parâmetro:

In [None]:
spark.sql("set spark.sql.caseSensitive=true")

### drop() - Removendo colunas

Por padrão, o Spark não é sensível a maiúsculas e minúsculas. Para torná-lo sensível, deve-se mudar o seguinte parâmetro:

In [None]:
df.drop("ORIGIN_COUNTRY_NAME").columns

In [None]:
# Eliminando múltiplas colunas 
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

### cast() - Mudando o tipo da coluna

Às vezes, precisamos converter de um tipo para outro. Por exemplo, se tivermos um conjunto de StringType que deve ser convertido para inteiro. Podemos converter colunas de um tipo para outro, lançando a coluna de um tipo para outro.

In [13]:
df

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [15]:
df.withColumn("count2", F.col("count").cast("string"))
# Equivalente em SQL:
# SELECT *, cast(count as long) AS count2 FROM dfTable

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: string]

### filter( ) / where( ) - Filtrando linhas

Para filtrar linhas, criamos uma expressão que é avaliada como verdadeira ou falsa. Em seguida, filtramos as linhas com uma expressão igual a _false_. A maneira mais comum de fazer isso com DataFrames é criar uma expressão como uma String ou criar uma expressão usando um conjunto de manipulações de coluna. Existem dois métodos para realizar esta operação: você pode usar __where()__ ou __filter()__ e ambos executarão a mesma operação e aceitarão os mesmos tipos de argumentos quando usados com DataFrames.

In [16]:
df.filter(col("count") < 2).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [17]:
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [None]:
# Equivalente em SQL:
# SELECT * FROM dfTable WHERE count < 2 LIMIT 2

Instintivamente, você pode querer colocar vários filtros na mesma expressão. Embora isso seja possível, nem sempre é útil, porque o Spark executa automaticamente todas as operações de filtragem ao mesmo tempo, independentemente da ordem do filtro. Isso significa que, se você quiser especificar vários filtros AND, encadeie-os sequencialmente e deixe o Spark manipular o restante.

In [18]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [None]:
# Equivalente em SQL:
# SELECT * FROM dfTable WHERE count < 2 AND ORIGIN_COUNTRY_NAME != "Croatia" LIMIT 2

### distinct( ) - Acessando valores não duplicados

Um caso de uso muito comum é extrair os valores exclusivos ou distintos em um DataFrame. Esses valores podem estar em uma ou mais colunas. A maneira como fazemos isso é usando o método __distinct()__, que nos permite deduplicar as linhas que estão nesse DataFrame. 

In [19]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
# Equivalente em SQL:
# SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable

256

In [20]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()
# Equivalente em SQL:
# SELECT COUNT(DISTINCT ORIGIN_COUNTRY_NAME) FROM dfTable

125

### sample() - Amostras aleatórias

Às vezes, você pode querer apenas colher amostras aleatórias de registros do seu DataFrame. Você pode fazer isso usando o método __sample()__ em um DataFrame, o que possibilita que você especifique uma fração de linhas para extrair de um DataFrame e, se deseja amostrar com ou sem substituição.

In [24]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

126

### randomSplit() - Divisões aleatórias

Pode ser útil dividir seu DataFrame em "divisões" aleatórias do DataFrame original. Isso é frequentemente usado com algoritmos de aprendizado de máquina para criar conjuntos de treinamento, validação e teste. Neste próximo exemplo, dividiremos nosso DataFrame em dois DataFrames diferentes, definindo os pesos pelos quais dividiremos o DataFrame (esses são os argumentos para a função). Como esse método foi desenvolvido para ser randomizado, também especificaremos uma semente(seed) (basta substituir a semente por um número de sua escolha no bloco de código).

In [27]:
dataFrames = df.randomSplit([0.25, 0.75], seed) # Dividindo as amostras em 25% e 75% do dataframe
dataFrames[0].count() > dataFrames[1].count()   # Verificando se a 1ª amostra é maior que a 2ª

False

### union() - Concatenando e adicionando Linhas

DataFrames são imutáveis, isso significa que os usuários não podem adicionar elementos a um DataFrame porque isso seria alterá-lo. Para adicionar elementos a um DataFrame, você deve unir o DataFrame original com o novo DataFrame. Isso apenas concatena os dois DataFrames. Para unir dois DataFrames, você deve ter certeza de que eles têm o mesmo esquema e número de colunas; caso contrário, a união falhará.

In [28]:
from pyspark.sql import Row

# Salvando o schema do DataFrame original em uma variável
schema = df.schema

# Criando novas linhas
newRows = [Row("New Country", "Other Country", 5L), Row("New Country 2", "Other Country 3", 1L)]

# Criando os RDDs com os dados da variável
parallelizedRows = spark.sparkContext.parallelize(newRows)

# Criando o DataFrame e aplicando o schema do DataFrame original
newDF = spark.createDataFrame(parallelizedRows, schema)

# Unindo os DataFrames com as regras de negócio dentro das clausulas where
df.union(newDF).where("count = 1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



### sort() / orderBy() - Ordenando Linhas

Quando classificamos os valores em um DataFrame, sempre queremos classificar os valores maiores ou menores no topo de um DataFrame. Existem duas operações equivalentes para fazer esse tipo e ordenar por que funcionam exatamente da mesma maneira. Eles aceitam expressões e strings de colunas, além de várias colunas.

In [29]:
# Ordenando pela coluna "count" e exibindo todo o conteúdo do dataframe
df.sort("count").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [30]:
# Ordenando pela coluna "count" e depois pela "DEST_COUNTRY_NAME" e exibindo todo o conteúdo do dataframe
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [31]:
# Ordenando com a mesma regra da celula anterior através da função col()
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



Para especificar explicitamente a direção de classificação, você precisa usar as funções __asc( )__ e __desc( )__ se estiver operando em uma coluna.

In [34]:
from pyspark.sql.functions import desc, asc, expr

In [35]:
df.orderBy(expr("count desc")).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [36]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



In [None]:
# Equivalente em SQL:
# SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2

Uma dica avançada é usar __asc_nulls_first__, __desc_nulls_first__, __asc_nulls_last__ ou __desc_nulls_last__ para especificar onde você gostaria que seus valores nulos aparecessem em um DataFrame ordenado. Para fins de otimização, às vezes é aconselhável classificar em cada partição antes de outro conjunto de transformações. Você pode usar o método __sortWithinPartitions()__ para fazer isso.

In [37]:
path = "file:///root/sparkcurso/Spark_Definitive_Guide/data/flight-data/json/*-summary.json"
spark.read.format("json").load(path).sortWithinPartitions("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### limit() - Limitando extração

Muitas vezes, você pode querer restringir o que você extrair de um DataFrame; por exemplo, você pode querer apenas os dez primeiros registros de algum DataFrame. Você pode fazer isso usando o método __limit( )__.

In [38]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [39]:
df.orderBy(expr("count desc")).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



In [40]:
# Equivalente em SQL:
# SELECT * FROM dfTable ORDER BY count desc LIMIT 6

### repartition() / coalesce() - Repartição e coalesce

Outra importante oportunidade de otimização é particionar os dados de acordo com algumas colunas filtradas com freqüência, que controlam o layout físico dos dados no cluster, incluindo o esquema de particionamento e o número de partições. A repartição incorrerá em uma mistura completa dos dados, independentemente de ser necessário. Isso significa que você normalmente só deve reparticionar quando o número futuro de partições for maior que o número atual de partições ou quando você estiver procurando particionar por um conjunto de colunas.

In [41]:
df.rdd.getNumPartitions()

1

In [42]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [43]:
# Se você sabe que vai filtrar uma determinada coluna com frequência, pode valer a pena reparticionar 
# com base nessa coluna.
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [44]:
# Especificando manualmente o número de partições 
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [45]:
# Coalesce, por outro lado, não incorrerá em um shuffle completo e tentará combinar partições. 
# Esta operação embaralha os seus dados em cinco partições com base no nome do país de destino e, 
# em seguida, une-as (sem um shuffle completo).
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### collect() / take() / show() / toLocalIterator() - Coletando dados para o Driver

O Spark mantém o estado do cluster no driver. Há momentos em que você deseja coletar alguns dos seus dados no driver para manipulá-los em sua máquina local. Até agora, não definimos explicitamente essa operação. No entanto, usamos vários métodos diferentes para fazer isso que são efetivamente todos iguais. __collect( )__ obtém todos os dados de todo o DataFrame, seleciona as primeiras __N__ linhas e mostra as várias linhas impressas.

In [47]:
collectDF = df.limit(10)

In [48]:
# take funciona com a passagem de um inteiro como parãmetro de contagem
collectDF.take(5)

[Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Romania', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Croatia', count=1),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Ireland', count=344),
 Row(DEST_COUNTRY_NAME=u'Egypt', ORIGIN_COUNTRY_NAME=u'United States', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'India', count=62)]

In [49]:
# Exibe o Dataframe na tela
collectDF.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



In [50]:
# truncate = False, exibe na íntegra o conteúdo do DataFrame
collectDF.show(5, False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+
only showing top 5 rows



In [51]:
# Coleta os valores presentes nos nós do cluster para o programa driver
# *** Este comando deve ser usado com muito cuidado, pois se o tamanho do Dataframe que for coletado ***
# *** para o driver for maior que a memória disponível, o problema driver ira "crashar". ***
collectDF.collect()

[Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Romania', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Croatia', count=1),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Ireland', count=344),
 Row(DEST_COUNTRY_NAME=u'Egypt', ORIGIN_COUNTRY_NAME=u'United States', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'India', count=62),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Singapore', count=1),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Grenada', count=62),
 Row(DEST_COUNTRY_NAME=u'Costa Rica', ORIGIN_COUNTRY_NAME=u'United States', count=588),
 Row(DEST_COUNTRY_NAME=u'Senegal', ORIGIN_COUNTRY_NAME=u'United States', count=40),
 Row(DEST_COUNTRY_NAME=u'Moldova', ORIGIN_COUNTRY_NAME=u'United States', count=1)]

Há uma maneira adicional de coletar linhas para o driver para iterar todo o conjunto de dados. O método __toLocalIterator()__ coleta partições para o driver como um iterador. Esse método permite iterar sobre todo o conjunto de dados, partição por partição, de maneira serial.

In [52]:
collectDF.toLocalIterator()

<itertools.chain at 0x7fbdcf155dd0>

### !!! Qualquer coleta de dados para o driver pode ser uma operação muito cara! Se você tiver um conjunto de dados grande e usar o collect( ), poderá interromper o driver. Se você usar toLocalIterator( ) e tiver partições muito grandes, poderá facilmente travar o nó do driver e perder o estado do seu aplicativo. Isso também é caro porque podemos operar em uma base um a um, em vez de executar o cálculo em paralelo !!!

Material baseado em exemplos do livro __Spark - The Definitive Guide. Bill Chambers e Matei Zaharia__