In [1]:
# Setar o ambiente para uso de Python no Spark
import support_functions as sf
sf.set_spark_python()

# Criar a sessao do Spark
from pyspark.sql import SparkSession
spark = SparkSession \
            .builder \
            .master("local[2]") \
            .appName("AndreDF") \
            .getOrCreate()

In [2]:
spark

In [3]:
sc = spark.sparkContext

Até agora vimos trabalhando com a estrutura base do Spark, as RDDs. Contudo, essas estruturas são trabalhosas e requerem o tratamento manual de tipos de dados bem como a interpretação constante dos dados posicionais na RDD.

É possível simplificar nosso trabalho com o uso de camadas novas e mais elevadas do Spark.


Existe uma série de funções para a criação e a criação e manipulação destas novas estruturas de dados. Vamos começar investigando os DataFrames, inspirados na biblioteca Pandas do Python.


## createDataFrame()
Para criar um DataFrame a partir de uma RDD, podemos utilizar a função `createDataFrame(RDD)`. É importante notar que o ponto de entrada das APIs de dados estruturados não é mais o `sparkContext`, mas diretamente a `SparkSession` acessível a partir da nossa variável `spark`.

In [5]:
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]

rdd = spark.sparkContext.parallelize(dept)

In [6]:
rdd.collect()

[('Finance', 10), ('Marketing', 20), ('Sales', 30), ('IT', 40)]

In [7]:
sdf = spark.createDataFrame(rdd)

In [8]:
sdf

DataFrame[_1: string, _2: bigint]

In [12]:
sdf

DataFrame[_1: string, _2: bigint]

Verificamos que a variável é do tipo DataFrame com colunas com nome `_1, _2,` e diferentes tipos, já __inferidos__ pelo Spark na criação do DataFrame a partir da RDD.

## take()
Podemos então utilizar a função take para verificar nosso DataFrame.

In [14]:
sdf.take(5)

[Row(_1='Finance', _2=10),
 Row(_1='Marketing', _2=20),
 Row(_1='Sales', _2=30),
 Row(_1='IT', _2=40)]

É interessante observar que a função nos mostra uma RDD e não um DataFrame. Essa RDD é composta de objetos do tipo Row, ou então linha. Isso é, um DataFrame nada mais é do que uma RDD onde cada elemento é uma linha de uma tabela com suas diferentes colunas. 

Diferentemente das RDDs padrões, contudo, os objetos Row grava consigo os nomes e os tipos das colunas e o Spark mantém a gestão garantindo que um mesmo DataFrame seja composto de linhas do mesmo tipo.

## Row()

Podemos manualmente criar uma linha, a partir da classe Row.

In [15]:
from pyspark.sql import Row

Para isso precisamos nomear as colunas e seus valores.

In [16]:
Row(name = 'Alice', age = 11)

Row(name='Alice', age=11)

Mais tarde entenderemos como concatenar diferentes objetos Row em um DataFrame.

## asDict()

Podemos transformar objetos do tipo Row em dicionários no python. 

In [17]:
sdf.take(2)

[Row(_1='Finance', _2=10), Row(_1='Marketing', _2=20)]

Para isso, vamos inicialmente guardar duas linhas de nosso DataFrame.

In [18]:
rows = sdf.take(2)

Na sequencia utilizamos a função asDict()

In [19]:
my_dict = rows[0].asDict()
my_dict

{'_1': 'Finance', '_2': 10}

Com isso podemos acessar os valores de cada coluna normalmente como um dicionário.

In [20]:
my_dict['_1'] 

'Finance'

## RDD
O atributo rdd do DataFrame também nos dá acesso direto à RDD fundamental que constrói o DataFrame.

In [21]:
sdf.rdd

MapPartitionsRDD[33] at javaToPython at NativeMethodAccessorImpl.java:0

Nessa RDD podemos aplicar todas as transformações e ações que aprendemos, desde que respeitando o tipo do elemento Row.

In [22]:
sdf.rdd.take(2)

[Row(_1='Finance', _2=10), Row(_1='Marketing', _2=20)]

# Acessando o DataFrame
A ideia de termos um DataFrame, contudo, é não precisarmos operar diretamente nas RDDs. Para isso existe um conjunto de funções de acesso direto ao DataFrame. Como estas funções operam intrinsecamente em RDDs, elas também são transformações e ações.


## show()
A ação show() nos permite olhar o DataFrame como uma tabela, muito mais próximo do Pandas DataFrame. Cuidado, contudo, com o desejo de olhar o DataFrame completo.

In [23]:
sdf.show()

+---------+---+
|       _1| _2|
+---------+---+
|  Finance| 10|
|Marketing| 20|
|    Sales| 30|
|       IT| 40|
+---------+---+



## printSchema()

A ação show() não nos mostra claramente qual é o tipo dos dados envolvidos (similar a dtypes). Para isso, podemos imprimir o Schema do DataFrame. 

O Schema é a definição do tipo de dado de cada coluna. É importante notar que o Spark inferiu esses dados a partir da RDD. O Spark infere a partir do primeiro elemento nos dados. Portanto, é importante que na leitura de arquivos, caso a inferência seja utilizada, garantir que a primeira linha do arquivo possua o tipo correto.

In [24]:
sdf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



## dtypes

Podemos também utilizar o atributo dtypes para mostrar o tipo de cada coluna.

In [25]:
sdf.dtypes

[('_1', 'string'), ('_2', 'bigint')]

# Definindo o Schema
Idealmente, quando operando em grandes massas de dados, a inferência do tipo pela primeira linha não é adequada e utilizamos como boa prática a definição manual do schema. Para isso precisamos utilizar os tipos de dados nativos do Spark que são convertidos para e de tipos python pela API pyspark.

Para isso importamos os tipos do spark.

In [26]:
from pyspark.sql.types import *

## StructType(), StructField(), IntegerType(), StringType(), FloatType()
Cada tipo de variável é na verdade um objeto de uma classe. Então acessamos estes objetos para instanciar e para identificar os tipos de cada variável. 

O Schema de um DataFrame é definido como um objeto StructType (similar a uma lista ou tupla) com campos do tipo StructField(). Cada campo (StructField) possui um nome, um tipo (objeto do tipo no spark) e um flag se é permitida a existência de nulos. 

Assim, podemos definir explicitametne um Schema para nosso DataFrame.

In [31]:
my_schema = StructType([ \
                       StructField('dept_name', StringType(), True), \
                       StructField('dept_id', IntegerType(), True), \
                       ])

Na sequencia, atribuímos o nosso schema ao DataFrame na sua criação.

In [32]:
sdf1 = spark.createDataFrame(rdd, schema = my_schema)

Podemos então olhar nosso DataFrame e verificar que as colunas agora vêm identificadas.

In [33]:
sdf1.show(5)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



Podemos também verificar o Schema do DataFrame com as características que definimos anteriormente.

In [35]:
sdf1.printSchema()

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: integer (nullable = true)



# Operações básicas com colunas

A exemplo do Pandas DataFrame, temos à nossa disposição uma série de transformações e ações que nos permite operar sobre o Spark DataFrame. Veremos a seguir as operações em colunas.

## columns
O atributo columns retorna, a exemplo do Pandas, uma lista com o nome das colunas. Contudo, diferente do Pandas, não é possível sobrescrever este atributo diretamente. 

In [36]:
sdf.columns

['_1', '_2']

## withColumnRenamed()
Para renomear uma coluna utilizamos a função withColumnRenamed(). Esta função opera em uma coluna por vez. Assim, podemos utilizar um laço para renomear todas as colunas.

In [37]:
for old_col, new_col in zip(sdf.columns, ['dept_name', 'dept_id']):
    sdf = sdf.withColumnRenamed(old_col, new_col)

In [38]:
sdf.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



## drop()
Podemos descartar colunas usando a função .drop()

In [39]:
sdf.drop('dept_id').show(5)

+---------+
|dept_name|
+---------+
|  Finance|
|Marketing|
|    Sales|
|       IT|
+---------+



## Acessando colunas
Para acessar as colunas de um DataFrame, podemos utilizar a mesma notação python. Observe, contudo, que as colunas são representadas como objetos e, assim, o acesso à coluna não retorna automaticamente os dados existentes nela.

In [40]:
sdf['dept_id']

Column<b'dept_id'>

In [41]:
sdf.dept_id

Column<b'dept_id'>

## select()
Para acessar os dados de uma coluna específica, precisamos utilizar a transformação select() seguida de uma ação show(). Esta transformação é similiar ao SELECT em SQL.

In [42]:
sdf.select('dept_name').show(5)

+---------+
|dept_name|
+---------+
|  Finance|
|Marketing|
|    Sales|
|       IT|
+---------+



## Case sensitivity
Observe que a exemplo do SQL, DataFrames em Spark não são naturalmente sensíveis ao caso.

In [43]:
sdf.select('DEPT_NAME').show(5)

+---------+
|DEPT_NAME|
+---------+
|  Finance|
|Marketing|
|    Sales|
|       IT|
+---------+



In [44]:
sdf.select('dept_NAME').show(5)

+---------+
|dept_NAME|
+---------+
|  Finance|
|Marketing|
|    Sales|
|       IT|
+---------+



# Operações básicas com linhas
Da mesma forma que temos operações com colunas, temos operações com linhas.

## limit()
A transformação limit(LIM) nos permite limitar um número de registros (linhas no DataFrame) a ser retornado. POdemos concatenar com .collect() e temos o DataFrame no formato de RDD. 


In [46]:
sdf.limit(3).collect()

[Row(dept_name='Finance', dept_id=10),
 Row(dept_name='Marketing', dept_id=20),
 Row(dept_name='Sales', dept_id=30)]

Com isso podemos reduzir o número de linhas do nosso DataFrame para aumentar a velocidade do nosso trabalho.

In [47]:
sdf = sdf.limit(500)

## filter() - filtrando linhas
A transformação filter() nos permite filtrar linhas com base em condições especificadas sobre colunas.

In [49]:
sdf.filter(sdf.dept_id == 20).show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Marketing|     20|
+---------+-------+



## where() - filtrando linhas

Para manter a similaridade como o SQL, a transformação filter() também pode ser acessada através de seu apelido (alias) where().

In [52]:
sdf.where(sdf.dept_id == 10).show(5)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
+---------+-------+



## distinct() - filtrando valores distintos
Utilizamos a transformação distinct() para selecionar apenas os valores distintos de uma coluna específica.

In [53]:
sdf.select('dept_id').distinct().show()

+-------+
|dept_id|
+-------+
|     10|
|     20|
|     30|
|     40|
+-------+



## union() - concatenando linhas
Para concatenar linhas no DataFrame, precisamos criar uma RDD com objetos do tipo Row e Schema idêntico.

In [54]:
new_rows = [Row(dept_name = 'Production', dept_id = 50), 
            Row(dept_name = 'Human Resources', dept_id = 60)]

In [55]:
new_rows

[Row(dept_name='Production', dept_id=50),
 Row(dept_name='Human Resources', dept_id=60)]

Transformamos essa lista em uma RDD, usando parallelize(), da mesma forma que fizemos anteriormente.

In [56]:
parallelizedRows = spark.sparkContext.parallelize(new_rows)

Por fim, criamos o DataFrame, definindo o Schema.

In [57]:
newDF = spark.createDataFrame(parallelizedRows, my_schema)

In [58]:
newDF.show()

+---------------+-------+
|      dept_name|dept_id|
+---------------+-------+
|     Production|     50|
|Human Resources|     60|
+---------------+-------+



Podemos então concatenar nosso novo DataFrame ao DataFrame antigo.

In [59]:
sdf1 = sdf.union(newDF)

In [60]:
sdf1.show()

+---------------+-------+
|      dept_name|dept_id|
+---------------+-------+
|        Finance|     10|
|      Marketing|     20|
|          Sales|     30|
|             IT|     40|
|     Production|     50|
|Human Resources|     60|
+---------------+-------+



## orderBy() - ordenando linhas
Com a transformação orderBy() podemos ordenar as linhas do DataFrame. Para isso, precisamos utilizar os métodos .asc() ou .desc() na coluna chave, indicando a ordem desejada.

In [62]:
sdf1.orderBy(sdf1.dept_id.desc()).show(5)

+---------------+-------+
|      dept_name|dept_id|
+---------------+-------+
|Human Resources|     60|
|     Production|     50|
|             IT|     40|
|          Sales|     30|
|      Marketing|     20|
+---------------+-------+
only showing top 5 rows



# pyspark.sql.functions
Existe uma série de funções que facilitam a manipulação de DataFrames. Elas estão disponíveis no pacote pyspark.sql.functions e podem ser investigadas na referência abaixo:

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

Podemos importá-las todas através da linha de comando abaixo.

In [63]:
from pyspark.sql import functions as sf


## lit() - criando colunas
Como visto anteriormente, o Spark possui tipos próprios de dados com correspondência com o Python. Se quisermos criar uma coluna constante, podemos evitar o trabalho de criar a coluna em Python e transformá-la, utilizando a função lit(val) que cria uma coluna de literais (constantes) val.

In [64]:
sf.lit(10)

Column<b'10'>

## withColumn() - adicionando colunas

Para isso, precisamos concatenar uma coluna de literais. Isso é feito com a transformação withColumn(nome, nova_col).

In [67]:
sdf1.withColumn('ones', sf.lit(1)).show(3)

+---------+-------+----+
|dept_name|dept_id|ones|
+---------+-------+----+
|  Finance|     10|   1|
|Marketing|     20|   1|
|    Sales|     30|   1|
+---------+-------+----+
only showing top 3 rows



## col() - acessando colunas

Em alguns casos precisamos explicitar que desejamos acessar um objeto do tipo Column. Para isso temos à nossa disposição a função .col(nome).

In [68]:
sdf1.select(sf.col('dept_id')).show(3)

+-------+
|dept_id|
+-------+
|     10|
|     20|
|     30|
+-------+
only showing top 3 rows



Observe que temos, com isso, diferentes formas de acessar colunas no select. Em alguns casos específicos será necessário optar pela menção explícita ao tipo do objeto usando .col()

In [69]:
sdf1.select(sdf1.dept_id, sf.col('dept_name'), 'dept_id').show(3)

+-------+---------+-------+
|dept_id|dept_name|dept_id|
+-------+---------+-------+
|     10|  Finance|     10|
|     20|Marketing|     20|
|     30|    Sales|     30|
+-------+---------+-------+
only showing top 3 rows



Podemos ainda acessar a RDD resultante da transformação.

In [71]:
sdf1.select(sdf1.dept_id, sf.col('dept_id'), 'dept_id').rdd.take(3)

[Row(dept_id=10, dept_id=10, dept_id=10),
 Row(dept_id=20, dept_id=20, dept_id=20),
 Row(dept_id=30, dept_id=30, dept_id=30)]

## alias()

A exemplo do SQL, podemos utilizar o método .alias() de uma coluna específica para renomear colunas rapidamente.

In [73]:
sdf.select('*', sf.col('dept_id').alias('identificador')).show(5)

+---------+-------+-------------+
|dept_name|dept_id|identificador|
+---------+-------+-------------+
|  Finance|     10|           10|
|Marketing|     20|           20|
|    Sales|     30|           30|
|       IT|     40|           40|
+---------+-------+-------------+



# Expressões
DataFrames são equivalentes de tabelas no Pandas e também de tabelas no SQL. Buscando manter compatibilidade com o SQL, expressões permitem que escrevamos algumas expressões simplificadas de SQL para operar no DataFrame.


## expr()
Para isso, utilizamos a função expr(). Podemos, por exemplo, renomear colunas.

In [74]:
sdf1.select(sf.expr('dept_name as departamento')).show(5)

+------------+
|departamento|
+------------+
|     Finance|
|   Marketing|
|       Sales|
|          IT|
|  Production|
+------------+
only showing top 5 rows



Podemos também operar sobre colunas diretamente, a exemplo do SQL.

In [75]:
sdf1.select('dept_id', 'dept_name', sf.expr('dept_id / 10')).show(3)

+-------+---------+--------------+
|dept_id|dept_name|(dept_id / 10)|
+-------+---------+--------------+
|     10|  Finance|           1.0|
|     20|Marketing|           2.0|
|     30|    Sales|           3.0|
+-------+---------+--------------+
only showing top 3 rows



## selectExpr()

Como a combinação de .select() e .expr() é muito comum, o spark já nos fornece um atalho: selectExpr().

In [76]:
sdf.selectExpr('*', 'dept_id / 10 as `id by 10`').show(5)

+---------+-------+--------+
|dept_name|dept_id|id by 10|
+---------+-------+--------+
|  Finance|     10|     1.0|
|Marketing|     20|     2.0|
|    Sales|     30|     3.0|
|       IT|     40|     4.0|
+---------+-------+--------+



# Abrindo arquivos CSV
Até agora trabalhamos com DataFrames criados manualmente a partir de uma RDD prévia. Podemos utilizar as funções do pacote .read do Spark para ler diretamente fontes de dados. Para carregar arquivos .csv usamos a função .csv.


In [77]:
df = spark.read.csv('../10_dados/movie_lens/ratings.csv', header=True)

In [5]:
df

DataFrame[userId: string, movieId: string, rating: string, timestamp: string]

In [7]:
df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
+------+-------+------+----------+
only showing top 5 rows



In [8]:
df.take(5)

[Row(userId='1', movieId='296', rating='5.0', timestamp='1147880044'),
 Row(userId='1', movieId='306', rating='3.5', timestamp='1147868817'),
 Row(userId='1', movieId='307', rating='5.0', timestamp='1147868828'),
 Row(userId='1', movieId='665', rating='5.0', timestamp='1147878820'),
 Row(userId='1', movieId='899', rating='3.5', timestamp='1147868510')]

In [15]:
df.columns

['userId', 'movieId', 'rating', 'timestamp']

In [16]:
df.dtypes

[('userId', 'string'),
 ('movieId', 'string'),
 ('rating', 'string'),
 ('timestamp', 'string')]

In [17]:
df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [21]:
from pyspark.sql.types import StringType, DoubleType, TimestampType, IntegerType, StructType, StructField

In [29]:
labels = (('userId', IntegerType()),
          ('movieId', IntegerType()),
          ('rating', DoubleType()),
          ('timestamp', StringType())
         )

In [30]:
schema = StructType([StructField(x[0], x[1], True) for x in labels])

In [31]:
print(schema)

StructType(List(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,DoubleType,true),StructField(timestamp,StringType,true)))


In [32]:
df = spark.read.csv('../10_dados/movie_lens/ratings.csv', header=True, schema = schema )

In [33]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [34]:
df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
+------+-------+------+----------+
only showing top 5 rows



In [36]:
df.select('userId').show(5)

+------+
|userId|
+------+
|     1|
|     1|
|     1|
|     1|
|     1|
+------+
only showing top 5 rows



In [38]:
df.select(['movieId', 'rating']).show(5)

+-------+------+
|movieId|rating|
+-------+------+
|    296|   5.0|
|    306|   3.5|
|    307|   5.0|
|    665|   5.0|
|    899|   3.5|
+-------+------+
only showing top 5 rows



In [39]:
from pyspark.sql.functions import lit

In [42]:
df = df.withColumn('new_column', lit(1))

In [43]:
df.show(5)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|new_column|
+------+-------+------+----------+----------+
|     1|    296|   5.0|1147880044|         1|
|     1|    306|   3.5|1147868817|         1|
|     1|    307|   5.0|1147868828|         1|
|     1|    665|   5.0|1147878820|         1|
|     1|    899|   3.5|1147868510|         1|
+------+-------+------+----------+----------+
only showing top 5 rows



In [45]:
from pyspark.sql.functions import col, concat
df = df.withColumn('concat', concat(col('movieId'), lit('-'), col('rating')))

In [46]:
df.show(5)

+------+-------+------+----------+----------+-------+
|userId|movieId|rating| timestamp|new_column| concat|
+------+-------+------+----------+----------+-------+
|     1|    296|   5.0|1147880044|         1|296-5.0|
|     1|    306|   3.5|1147868817|         1|306-3.5|
|     1|    307|   5.0|1147868828|         1|307-5.0|
|     1|    665|   5.0|1147878820|         1|665-5.0|
|     1|    899|   3.5|1147868510|         1|899-3.5|
+------+-------+------+----------+----------+-------+
only showing top 5 rows



In [47]:
df = df.withColumnRenamed('new_column', 'constant')

In [48]:
df.show(5)

+------+-------+------+----------+--------+-------+
|userId|movieId|rating| timestamp|constant| concat|
+------+-------+------+----------+--------+-------+
|     1|    296|   5.0|1147880044|       1|296-5.0|
|     1|    306|   3.5|1147868817|       1|306-3.5|
|     1|    307|   5.0|1147868828|       1|307-5.0|
|     1|    665|   5.0|1147878820|       1|665-5.0|
|     1|    899|   3.5|1147868510|       1|899-3.5|
+------+-------+------+----------+--------+-------+
only showing top 5 rows



In [49]:
df.groupBy('movieId').count().show(5)

+-------+-----+
|movieId|count|
+-------+-----+
|   1088|11935|
|   1580|40308|
|   3175|14659|
|  44022| 4833|
| 175197|  610|
+-------+-----+
only showing top 5 rows



In [52]:
df.count()

25000095

In [53]:
df.createOrReplaceTempView('movielens')

In [55]:
query = spark.sql('select * from movielens limit 5')

In [61]:
query.explain(mode="formatted")

== Physical Plan ==
CollectLimit (3)
+- * Project (2)
   +- Scan csv  (1)


(1) Scan csv 
Output [4]: [userId#191, movieId#192, rating#193, timestamp#194]
Batched: false
Location: InMemoryFileIndex [file:/data/aluno/shared/10_dados/movie_lens/ratings.csv]
ReadSchema: struct<userId:int,movieId:int,rating:double,timestamp:string>

(2) Project [codegen id : 1]
Output [6]: [userId#191, movieId#192, rating#193, timestamp#194, 1 AS constant#334, concat(cast(movieId#192 as string), -, cast(rating#193 as string)) AS concat#298]
Input [4]: [userId#191, movieId#192, rating#193, timestamp#194]

(3) CollectLimit
Input [6]: [userId#191, movieId#192, rating#193, timestamp#194, constant#334, concat#298]
Arguments: 5




In [62]:
query.show(5)

+------+-------+------+----------+--------+-------+
|userId|movieId|rating| timestamp|constant| concat|
+------+-------+------+----------+--------+-------+
|     1|    296|   5.0|1147880044|       1|296-5.0|
|     1|    306|   3.5|1147868817|       1|306-3.5|
|     1|    307|   5.0|1147868828|       1|307-5.0|
|     1|    665|   5.0|1147878820|       1|665-5.0|
|     1|    899|   3.5|1147868510|       1|899-3.5|
+------+-------+------+----------+--------+-------+



In [69]:
query = spark.sql("""select movieId, count(*)
                     from movielens 
                     where rating = 5
                     group by movieId 
                     order by 2 desc 
                     limit 5""")

In [70]:
query.explain(mode="formatted")

== Physical Plan ==
TakeOrderedAndProject (7)
+- * HashAggregate (6)
   +- Exchange (5)
      +- * HashAggregate (4)
         +- * Project (3)
            +- * Filter (2)
               +- Scan csv  (1)


(1) Scan csv 
Output [2]: [movieId#192, rating#193]
Batched: false
Location: InMemoryFileIndex [file:/data/aluno/shared/10_dados/movie_lens/ratings.csv]
PushedFilters: [IsNotNull(rating), EqualTo(rating,5.0)]
ReadSchema: struct<movieId:int,rating:double>

(2) Filter [codegen id : 1]
Input [2]: [movieId#192, rating#193]
Condition : (isnotnull(rating#193) AND (rating#193 = 5.0))

(3) Project [codegen id : 1]
Output [1]: [movieId#192]
Input [2]: [movieId#192, rating#193]

(4) HashAggregate [codegen id : 1]
Input [1]: [movieId#192]
Keys [1]: [movieId#192]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#463L]
Results [2]: [movieId#192, count#464L]

(5) Exchange
Input [2]: [movieId#192, count#464L]
Arguments: hashpartitioning(movieId#192, 200), true, [id=#334]

(6) HashAg

In [71]:
query.show(5)

+-------+--------+
|movieId|count(1)|
+-------+--------+
|    318|   39553|
|    296|   32169|
|    356|   25918|
|    260|   25804|
|   2571|   25482|
+-------+--------+



In [72]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [75]:
query.cache()

movieId,count(1)
318,39553
296,32169
356,25918
260,25804
2571,25482


In [76]:
query

movieId,count(1)
318,39553
296,32169
356,25918
260,25804
2571,25482


In [77]:
query.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[count(1)#459L DESC NULLS LAST], output=[movieId#192,count(1)#459L])
+- *(2) HashAggregate(keys=[movieId#192], functions=[count(1)])
   +- Exchange hashpartitioning(movieId#192, 200), true, [id=#334]
      +- *(1) HashAggregate(keys=[movieId#192], functions=[partial_count(1)])
         +- *(1) Project [movieId#192]
            +- *(1) Filter (isnotnull(rating#193) AND (rating#193 = 5.0))
               +- FileScan csv [movieId#192,rating#193] Batched: false, DataFilters: [isnotnull(rating#193), (rating#193 = 5.0)], Format: CSV, Location: InMemoryFileIndex[file:/data/aluno/shared/10_dados/movie_lens/ratings.csv], PartitionFilters: [], PushedFilters: [IsNotNull(rating), EqualTo(rating,5.0)], ReadSchema: struct<movieId:int,rating:double>




In [81]:
query.unpersist()

movieId,count(1)
318,39553
296,32169
356,25918
260,25804
2571,25482


In [79]:
query

movieId,count(1)
318,39553
296,32169
356,25918
260,25804
2571,25482
