# Dataframe

- Tabelas com linhas e colunas
- Imutaveis
- Com schema conhecido
- Linhagem preservada
- Colunas podem ter tipos diferentes
- Existem analises comuns: Agrupar, ordenar,filtrar
- Spark pode otimizar estas analises atraves de planos de execucao

### Lazy Evaluetion
- O processamento de transformacao de fato so ocorre quando ha uma açao

### Schema
- Voce pode deixar para o Spark inferir a partir de parte dos dados
- Ou voce pode definir o schema

Definir tem vantagens:
- Tipo correto
- Sem overhead

In [2]:
#importando o pyspark e outros modulos necessarios
from pyspark.sql import SparkSession

In [4]:
#iniciando uma Spark Session
spark = SparkSession.builder \
    .master("local") \
        .appName("RDD") \
            .config("spark.executer.memory","1gb") \
                .getOrCreate()

In [5]:
#criando um dataframe passando um conjunto de dados sem schema
df1 = spark.createDataFrame([("Pedro",10),("Maria",20),("Jose", 40)])

In [6]:
df1.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| Jose| 40|
+-----+---+



                                                                                

In [7]:
#criando um dataframe com schema
schema = "Id INT, Nome STRING"
dados = [[1,"Pedro"],[2,"Maria"]]

In [8]:
df2 = spark.createDataFrame(dados,schema)

In [9]:
df2.show()

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



### Transformacoes iniciais com Spark

In [11]:
from pyspark.sql.functions import *

In [12]:
schema2 = "Produtos STRING, Vendas INT"
vendas = [["Caneta",10],["Lapis",20],["Caneta",40]]

In [14]:
df3 = spark.createDataFrame(vendas,schema2)
df3.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lapis|    20|
|  Caneta|    40|
+--------+------+



In [16]:
#agregacao
agrupado = df3.groupBy("Produtos").agg(sum("Vendas"))
agrupado.show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lapis|         20|
+--------+-----------+



In [17]:
#selecionando quais colunas mostrar
df3.select("Produtos").show()

+--------+
|Produtos|
+--------+
|  Caneta|
|   Lapis|
|  Caneta|
+--------+



In [18]:
#criando expressoes (calculo)
df3.select("Produtos","Vendas",expr("Vendas * 0.2")).show()


+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lapis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



In [19]:
#visualizando o schema do dataframe
df3.schema

StructType(List(StructField(Produtos,StringType,true),StructField(Vendas,IntegerType,true)))

In [20]:
#visualizando as colunas do dataframe
df3.columns

['Produtos', 'Vendas']

#### Ingerindo dados atraves de um arquivo CSV

In [21]:
from pyspark.sql.types import *

In [22]:
#criando o schema
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

In [24]:
#lendo o arquivo csv e inferindo o schema criado
despachantes = spark.read.csv("/home/douglas/download/despachantes.csv",header=False, schema=arqschema)

In [25]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [26]:
#carregando o arquivo csv com outra funcao, a load
#nessa funcao temos que informar o tipo do arquivo
#nessa carga deixaremos o spark avaliar os dados e inferir o schema
desp_autoschema = spark.read.load("/home/douglas/download/despachantes.csv",header=False,format="csv", sep=",", inferSchema=True)

In [27]:
desp_autoschema.show()

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+



In [29]:
#temos que verificar se o spark fez a inferencia do esquema corretamente
despachantes.schema

StructType(List(StructField(id,IntegerType,true),StructField(nome,StringType,true),StructField(status,StringType,true),StructField(cidade,StringType,true),StructField(vendas,IntegerType,true),StructField(data,StringType,true)))

In [30]:
desp_autoschema.schema

StructType(List(StructField(_c0,IntegerType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true),StructField(_c4,IntegerType,true),StructField(_c5,StringType,true)))

#### Aplicando fitros com clausula WHERE

In [38]:
from pyspark.sql import functions as Func 
from pyspark.sql.functions import *

In [33]:
#passando um filtro com clausula WHERE aplicada na coluna Vendas
despachantes.select("id","nome","vendas").where(Func.col("Vendas") > 20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



##### Operadores logicos:
- & = AND
- | = OR
- ~ = NOT

In [35]:
#fazendo uma consulta com clausula WHERE e operador logico AND
despachantes.select("id","nome","vendas").\
    where((Func.col("Vendas") > 20) & (Func.col("Vendas") < 40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



##### Manipulando colunas

In [37]:
#alterando o nome da coluna
novodf = despachantes.withColumnRenamed("nome","nomes")
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [39]:
#alterando o tipo da coluna
# necessario crirar uma nova coluna informando qual o tipo e qual a coluna que ela ira se basear
despachantes2 = despachantes.withColumn("data2",to_timestamp(Func.col("data"),"yyyy-MM-dd"))

In [40]:
despachantes2.schema

StructType(List(StructField(id,IntegerType,true),StructField(nome,StringType,true),StructField(status,StringType,true),StructField(cidade,StringType,true),StructField(vendas,IntegerType,true),StructField(data,StringType,true),StructField(data2,TimestampType,true)))

In [42]:
#usando funcao de data
despachantes2.select(year("data"))\
    .distinct()\
        .show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



In [43]:
#concatenando comandos
despachantes2.select("nome",year("data"))\
    .orderBy("nome")\
        .show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+



In [44]:
despachantes2.select("data").groupBy(year("data")).count().show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+



In [45]:
despachantes2.select(Func.sum("vendas")).show()

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+

