<a href="https://colab.research.google.com/github/cruz-marco/pyspark_course/blob/main/pyspark_DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instalação e configuração Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
!tar xf spark-3.2.3-bin-hadoop3.2.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
os.environ["SPARK_HOME"] = '/content/spark-3.2.3-bin-hadoop3.2'

!pip install -q findspark

import findspark
findspark.init()
findspark.find()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# DataFrames

- Tabelas com linhas e colunas;
- Imutáveis;
- Schema conhecido;
- Linhagem Preservada;
- Colunas podem ter tipos diferentes;
- Podemos agrupar, ordenar e filtrar;
- Spark otimiza análises usando planos de execução (DAG's)

## Lazy Evaluation
> O processamento da transformação só ocorre quando há uma ação: 

## Ações:
> (reduce, collect, count, first, take, takeSample, takeOrdered, saveAsTestFile, saveAsSequenceFile, saveAsObjectFile, countByKey, foreach)

## Transformações:
> (map, filter, flatMap, mapPartitions, mapPartitionsWithIndex, sample, union, intersection, distinct, groupByKey, reduceByKey, aggregateByKey, sortByKey, join, cogroup, cartesian, pipe, coalesce, repartition, repartitionAndSortWithinPartitions)


### Criando um [DataFrame](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame) de exemplo:
- [spark.createDataFrame()](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.SparkSession.createDataFrame.html)

In [None]:
data = [("Pedro", 10), ("Maria", 20), ("José", 40)] #Dados a serem inseridos na tabela
df1 = spark.createDataFrame(data) #instanciando o DataFrame
df1.show() #Comando para mostar o frame, pode recer um parâmetro com o número.

In [None]:
schema = "Id INT, Nome STRING" #definindo um schema a ser usado no DataFrame
data_2 = [(1, "Pedro"), (2, "Maria")]

df2 = spark.createDataFrame(data_2, schema=schema)

df2.show()

### Pacote de [funções](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html):

In [None]:
from pyspark.sql import functions as f #Importando a biblioteca de funções
schema_2 = "produtos STRING, vendas INT"
vendas = [("Caneta", 10), ("Lápis", 20), ("Caneta", 40)]

df3 = spark.createDataFrame(vendas, schema_2)

df3.show()

In [None]:
"""
Diferente do Pandas, as funções ficam em um pacote a parte, 
logo, temos que fazer o import ou do pacote, ou da função 
específica. Por exemplo, a sum() utilizada com o método de 
agregação, conforme mostrado abaixo.
"""

agrupado = df3.groupBy("produtos")\
            .agg(f.sum("vendas"))

agrupado.show()

In [None]:
df3.select("produtos").show() # Selecionando uma única coluna no DataFrame, por exemplo.

In [None]:
df3.select("produtos", "vendas", f.expr("vendas * 0.2")).show() 
"""
A Função EXPR cria uma expressão que pode ser usada para criar mais uma linha
no dataframe, enriquecendo a nossa análise. Ela recebe uma string com a
expressão a ser processada.
"""

In [None]:
display(
    df3.schema, # ver a estrutura das colunas
    df3.columns # ver os nomes das colunas
)

### Carregando dados de duas maneiras distintas no Spark (DataFrames)

Usando o spark.read.csv:

In [None]:
from pyspark.sql.types import *

arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"
# esquema de colunas, colocando seu nome e tipo, em sequência, para ser usado
# como referência na leitura dos dados em CSV, para podermos escolher
# os tipos de dados e o nome das colunas, visto que o CSV em questão não possui
# cabeçalho.

despachantes = spark.read.csv("/content/drive/MyDrive/Datasets/pyspark_course/despachantes.csv", 
                              header=False, schema=arqschema)


In [None]:
despachantes.show()

In [None]:
despachantes.groupBy("cidade")\
.agg(f.sum("vendas")).show()
# Exemplo de groupBy

Usando o spark.read.load:

In [None]:
desp_autoschema = spark.read.load("/content/drive/MyDrive/Datasets/pyspark_course/despachantes.csv",
                                  header=False, format="csv", sep=",",
                                  inferSchema=True)
# forma um tanto mais sucinta, é necessário informar o formato e permitir o 
# parâmetro inferSchema (valor True) para que o próprio spark deduza o tipo
# de dados sozinho. É uma boa prática informar qual o tipo de separador SEP,
# pois no Brasil, usamos a vírgula para declarar a parte decimal de um número
# não inteiro.

In [None]:
desp_autoschema.show()
# como o arquivo não tem cabeçalho, o spark nomeia as colunas automaticamente.

In [None]:
display(
    despachantes.schema, # comparação dos schemas declarados e inferidos.
    "------------------",
    desp_autoschema.schema
)


### Fazendo consultas

> Select, Where, OrderBy, Distinct são cláusulas SQL que no PySpark são métodos do objeto DataFrame, diferentemente do pandas, temos que usar o SELECT para selecionar as colunas que nos interessam, no lugar de exibirmos todas.

In [None]:
despachantes.select("id", "nome", "vendas")\
        .where(f.col("vendas") > 20)\
        .show()

> A cláusula WHERE deve ser usada juntamente com a função COL do pacote de funções SQL do PySpark. As condições podem ser conectadas usando "&" e "|", e uma expressão pode ser negada usando "~".

In [None]:
despachantes.select("id", "nome", "vendas")\
        .where((f.col("vendas") > 20) & (f.col("vendas") < 40))\
        .show()

### Renomeando colunas:
> Diferente do pandas, onde podemos renomear diversas colunas passando um dicionário como parâmetro para o método da classe DataFrame; as colunas do DataFrame do PySpark devem ser renomeadas uma a uma. Portanto, faz-se necessário o uso de um loop, caso queiramos renomear mais de uma de forma ágil, por exemplo.

In [None]:
#renomeando todas as colunas de uma só vez numa nova variável
# novo_desp
novo_desp = desp_autoschema
for i in list(zip(desp_autoschema.columns, despachantes.columns)):
  novo_desp = novo_desp.withColumnRenamed(*i)

novo_desp.show()

### Criando uma nova coluna com os dados da coluna data com o tipo timestamp

In [None]:
novo_desp = novo_desp.withColumn("data2", f.to_timestamp(f.col("data"), 
                                "yyyy-MM-dd"))
novo_desp.show()

In [None]:
novo_desp.schema

> Selecionando os anos das datas (str e timestamp) e os nomes dos despachantes, ordenando por nome.

In [None]:
novo_desp.select(f.year("data"), f.year("data2"), "nome")\
                .distinct()\
                .orderBy("nome")\
                .show()

> Usando o ALIAS, podemos dar apelidos para todas as colunas, logo, podemos criar um agrupamento usando as funções do pacote functions e as apelidando e usando a referência do apelido em outros métodos do DataFrame como o groupBy e o orderBy.

> Para ordenar crescente o decrescente, usamos .asc() ou .desc(), sendo estes métodos para a função COL do pacote de funções SQL.|

In [None]:
novo_desp.select(f.year("data").alias("anos"))\
                .groupBy("anos")\
                .agg(f.count(f.col("anos")).alias("ocorr"))\
                .orderBy(f.col("ocorr").desc())\
                .show()#Só funciona se dermos um apelido para a coluna

> Total de vendas

In [None]:
novo_desp.select(f.sum("vendas")).show()

# Principais Transformações e Ações:

In [None]:
despachantes.take(5)

In [None]:
despachantes.collect() #ação herdada do RDD

In [None]:
despachantes.count() #sem nenhum argumento, retorna o número de linhas.

> Podemos ordenar por duas colunas usando a função col com o método asc ou desc

In [None]:
despachantes.orderBy(f.col("cidade").asc(),
                     f.col("vendas").desc())\
                     .show()

> Tabela de agrupamento retornando a quantidade de vendas das cidades da maior para a menor.

In [None]:
despachantes.groupBy("cidade")\
            .agg(f.sum("vendas").alias("vendas_totais"))\
            .orderBy(f.col("vendas_totais").desc())\
            .show()

> Filtrando o DataFrame inteiro a partir de uma coluna.

In [None]:
despachantes.filter(f.col("cidade") != "Porto Alegre").show()

## Escrita e Carregamento de Arquivos

> Salvando o novo_desp em arquivo parquet no Drive. O Arquivo Parquet salva o cabeçalho e os tipos de dados automaticamente.

In [None]:
novo_desp.write.format("parquet").save("/content/drive/MyDrive/Datasets/pyspark_course/dfpqt")

> Importando e testando o arquivo criado:

In [None]:
imported_df = spark.read.load("/content/drive/MyDrive/Datasets/pyspark_course/dfpqt/part-00000-c8a74511-ecb8-4fc4-91ba-77fe818d7065-c000.snappy.parquet",
                              format="parquet")

In [None]:
imported_df.show()

In [None]:
imported_df.schema