<a href="https://colab.research.google.com/github/ReMeiradL/Pyspark_study/blob/main/DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instalação Pyspark

In [None]:
#INSTALAR O SPARK NO COLAB
!pip install pyspark py4j

#IMPORTAR SPARK NO COLAB
from pyspark.sql import SparkSession

#CRIAR SESSÃO SPARK
spark = SparkSession.builder.appName("ExemploPySpark").getOrCreate()

#OBTER CONTEXTO SPARK PARA A SESSÃO
sc = spark.sparkContext



# DataFrame

In [None]:
#CRIAR DATAFRAME
#(colocar um arquivo do excel na pasta, copiar caminho e colar depois do =)
df1 = spark.createDataFrame([("Pedro", 10), ("Maria", 30), ("José", 40)])

In [None]:
#VISUALIZAR O DATAFRAME
#(Tem outras forma de mostrar)
df1.show()

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 30|
| José| 40|
+-----+---+



In [None]:
#CRIAR A COLUNA
coluna = "Id INT, Nome STRING"
dados = [[1,"Pedro"],[2,"Maria"]]
df2 = spark.createDataFrame(dados, coluna) #criando um novo dataframe
df2.show()

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



In [None]:
df2.show(1) #não é o Id. É a primeira linha

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
+---+-----+
only showing top 1 row


In [None]:
#COMANDOS DE SPARK EM SEQUÊNCIA
from pyspark.sql.functions import sum

In [None]:
#CRIAR NOVO DATAFRAME
schema2 = "Produtos STRING, Vendas INT"
vendas = [['Caneta', 10],['Lápis', 20], ['Caneta', 40]]
df3 = spark.createDataFrame(vendas, schema2)
df3.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



In [None]:
df3.show(2)

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
+--------+------+
only showing top 2 rows


In [None]:
#AGREGAR TOTAL DE VENDAS POR PRODUTO
agrupado = df3.groupBy("Produtos").agg(sum("Vendas"))
agrupado.show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [None]:
agrupado = df3.groupBy("Produtos").agg(sum("Vendas")).show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [None]:
#ORDENAR COLUNAS (trocando a ordem das colunas)
df3.select("Vendas", "Produtos").show()

+------+--------+
|Vendas|Produtos|
+------+--------+
|    10|  Caneta|
|    20|   Lápis|
|    40|  Caneta|
+------+--------+



In [None]:
df3.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



In [None]:
#CRIAR EXPRESSÕES OU CALCULOS
from pyspark.sql.functions import expr

In [None]:
df3.select("Produtos","Vendas", expr("Vendas * 0.2")).show()

+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lápis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



# Schema_DF

In [None]:
#Criando o DataFrame
schema2 = "Produtos STRING, Vendas INT"
vendas = [['Caneta', 10],['Lápis', 20], ['Caneta', 40]]
df4 = spark.createDataFrame(vendas, schema2)
df4.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



In [None]:
#VERIFICAR COLUNA E TYPE
df4.schema

StructType([StructField('Produtos', StringType(), True), StructField('Vendas', IntegerType(), True)])

In [None]:
#VERIFICAR COLUNAS
df4.columns

['Produtos', 'Vendas']

In [None]:
#INGESTÃO DE DADOS (importanto tudo que tem dentro de types)
#(Se tiver recurso de computador limitado, não trazer tudo)
from pyspark.sql.types import *

In [None]:
arqschema = 'id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING'

In [None]:
despachantes = spark.read.csv("/SPARK/despachantes.csv", header=False, schema=arqschema)
#arqschema = 'id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING'
#(header=False --> Não colocar o cabeçalho que vem do csv, colocar o arqschema)
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
#PARA QUE ELE DETECTE O SCHEMA
desp_auto_schema = spark.read.load("/SPARK/despachantes.csv", header=False, format="csv", sep=",", inferSchema=True)
#("inferSchema=True" = cria o schema que o programa quiser; format="csv", sep="," --> formato bruto do csv, com colunas separadas por ,)
desp_auto_schema.show()

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+



In [None]:
despachantes.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- status: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- vendas: integer (nullable = true)
 |-- data: string (nullable = true)



In [None]:
#VAMOS VERIFICAR SE O SPARK ACERTOU O TIPO DOS DADOS
#VAMOS COMPARAR OS TIPOS DE SCHEMAS

In [None]:
despachantes.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

In [None]:
desp_auto_schema.schema

StructType([StructField('_c0', IntegerType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', IntegerType(), True), StructField('_c5', DateType(), True)])

# Manipulação_DF

In [None]:
#FILTRAR DADOS ATRAVÉS DE CONDIÇÃO LÓGICA
from pyspark.sql import functions as Func
#(as Func é uma boa prática muito usada)

In [None]:
#APENAS VENDAS ACIMA DE 20
despachantes.select("id", "nome", "vendas").where(Func.col("vendas")>20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [None]:
#VENDAS MAIORES QUE 20 E MENOR QUE 40
despachantes.select("id", "nome", "vendas").where((Func.col("vendas")>20)
& (Func.col("vendas") < 40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



In [None]:
#RENOMEAR NOME DE COLUNAS
novodf = despachantes.withColumnRenamed("nome", "nomes")
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [None]:
#MUDAR O TIPO (aqui foi criada nova coluna e não apenas modificada a mesma)
from pyspark.sql.functions import *
#("to_timestamp" é uma função)
despachantes2 = despachantes.withColumn("data2", to_timestamp(Func.col("data"),
                                                                   "yyyy-MM-dd"))
despachantes2.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True), StructField('data2', TimestampType(), True)])

In [None]:
#OPERAÇÕES SOBRE DATAS
despachantes2.select(year("data")).show()
despachantes2.select(year("data")).distinct().show()
despachantes2.select("nome",year("data")).orderBy("nome").show()
despachantes2.select("data").groupBy(year("data")).count().show()
despachantes2.select(Func.sum("vendas")).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+



In [None]:
#SALVAR EM DIRETÓRIOS
despachantes.write.format("parquet").save("/SPARK/dfimportparquet")
despachantes.write.format("csv").save("/SPARK/dfimportcsv")
despachantes.write.format("json").save("/SPARK/dfimportjson")
despachantes.write.format("orc").save("/SPARK/dfimportorc")