In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession

In [2]:
spark = (SparkSession.builder
    .master("local[*]")
    .appName("estudando-dataframes")
    .getOrCreate())

22/06/28 15:34:18 WARN Utils: Your hostname, idea-Inspiron-15-3567 resolves to a loopback address: 127.0.1.1; using 192.168.1.24 instead (on interface enp2s0)
22/06/28 15:34:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/06/28 15:34:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:

dados = (
    ['1', 'José', 'Anápolis', 'São Paulo', '01-09-1900'],
    ['2', 'Igor', 'Anápolis', 'São Paulo', '11-09-1977'],
    ['3', 'Leonardo', 'Anápolis', 'São Paulo', '21-12-2000'],
    ['4', 'Humberto', 'Pato Branco', 'Rio Grande do Sul', '13-11-1964'],
    ['5', 'Isaias', 'Pato Branco', 'Rio Grande do Sul', '07-07-2002'],
    ['6', 'Lucas', 'Tauá', 'Ceará', '05-09-1984'],
)

In [4]:
# Estrutura do dataframe
esquema = (StructType([
    StructField('cod_cliente', StringType(), True),
    StructField('nome', StringType(), True),
    StructField('municipio', StringType(), True),
    StructField('estado', StringType(), True),
    StructField('data_nasc', StringType(), True),
]))

In [5]:
df = spark.createDataFrame(data=dados, schema=esquema) # Criando dataframe 
df.show()

                                                                                

+-----------+--------+-----------+-----------------+----------+
|cod_cliente|    nome|  municipio|           estado| data_nasc|
+-----------+--------+-----------+-----------------+----------+
|          1|    José|   Anápolis|        São Paulo|01-09-1900|
|          2|    Igor|   Anápolis|        São Paulo|11-09-1977|
|          3|Leonardo|   Anápolis|        São Paulo|21-12-2000|
|          4|Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|
|          5|  Isaias|Pato Branco|Rio Grande do Sul|07-07-2002|
|          6|   Lucas|       Tauá|            Ceará|05-09-1984|
+-----------+--------+-----------+-----------------+----------+



In [6]:
# Contador sequencial por municipio
df.groupBy('municipio').count().sort('municipio', ascending=True).show(truncate=False)

[Stage 2:>                                                          (0 + 4) / 4]

+-----------+-----+
|municipio  |count|
+-----------+-----+
|Anápolis   |3    |
|Pato Branco|2    |
|Tauá       |1    |
+-----------+-----+



                                                                                

In [7]:
# Ordenando a coluna estado
df = df.orderBy('estado') 
df.show()

+-----------+--------+-----------+-----------------+----------+
|cod_cliente|    nome|  municipio|           estado| data_nasc|
+-----------+--------+-----------+-----------------+----------+
|          6|   Lucas|       Tauá|            Ceará|05-09-1984|
|          5|  Isaias|Pato Branco|Rio Grande do Sul|07-07-2002|
|          4|Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|
|          2|    Igor|   Anápolis|        São Paulo|11-09-1977|
|          3|Leonardo|   Anápolis|        São Paulo|21-12-2000|
|          1|    José|   Anápolis|        São Paulo|01-09-1900|
+-----------+--------+-----------+-----------------+----------+



In [8]:
# Adicionando uma coluna nova de idade em anos                           cast converte p/inteiro
df = df.withColumn('idade', F.date_format(F.current_timestamp(), 'yyyy').cast('integer') \
                   - F.substring(F.col('data_nasc'), 7, 4).cast('integer')) #subtraindo para obter idade
df.show()

+-----------+--------+-----------+-----------------+----------+-----+
|cod_cliente|    nome|  municipio|           estado| data_nasc|idade|
+-----------+--------+-----------+-----------------+----------+-----+
|          6|   Lucas|       Tauá|            Ceará|05-09-1984|   38|
|          4|Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|   58|
|          5|  Isaias|Pato Branco|Rio Grande do Sul|07-07-2002|   20|
|          1|    José|   Anápolis|        São Paulo|01-09-1900|  122|
|          3|Leonardo|   Anápolis|        São Paulo|21-12-2000|   22|
|          2|    Igor|   Anápolis|        São Paulo|11-09-1977|   45|
+-----------+--------+-----------+-----------------+----------+-----+



In [9]:
# Adicionando 0000 a esquerda na coluna cod_cliente
df = df.withColumn('cod_cliente', F.lpad(df.cod_cliente, 4, '0')) 
df.show()

+-----------+--------+-----------+-----------------+----------+-----+
|cod_cliente|    nome|  municipio|           estado| data_nasc|idade|
+-----------+--------+-----------+-----------------+----------+-----+
|       0006|   Lucas|       Tauá|            Ceará|05-09-1984|   38|
|       0004|Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|   58|
|       0005|  Isaias|Pato Branco|Rio Grande do Sul|07-07-2002|   20|
|       0001|    José|   Anápolis|        São Paulo|01-09-1900|  122|
|       0003|Leonardo|   Anápolis|        São Paulo|21-12-2000|   22|
|       0002|    Igor|   Anápolis|        São Paulo|11-09-1977|   45|
+-----------+--------+-----------+-----------------+----------+-----+



In [10]:
# Adicionando 0 a esquerda na coluna cod_cliente
df = df.withColumn('cod_cliente', F.lpad(df.cod_cliente, 4, '0')) 
df.show()

+-----------+--------+-----------+-----------------+----------+-----+
|cod_cliente|    nome|  municipio|           estado| data_nasc|idade|
+-----------+--------+-----------+-----------------+----------+-----+
|       0006|   Lucas|       Tauá|            Ceará|05-09-1984|   38|
|       0005|  Isaias|Pato Branco|Rio Grande do Sul|07-07-2002|   20|
|       0004|Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|   58|
|       0002|    Igor|   Anápolis|        São Paulo|11-09-1977|   45|
|       0003|Leonardo|   Anápolis|        São Paulo|21-12-2000|   22|
|       0001|    José|   Anápolis|        São Paulo|01-09-1900|  122|
+-----------+--------+-----------+-----------------+----------+-----+



In [12]:
df = df.withColumn('data_atualizacao', F.date_format(F.current_timestamp(), 'dd-MM-yyyy | HH:mm:ss')) 
df.show()

+-----------+--------+-----------+-----------------+----------+-----+--------------------+
|cod_cliente|    nome|  municipio|           estado| data_nasc|idade|    data_atualizacao|
+-----------+--------+-----------+-----------------+----------+-----+--------------------+
|       0006|   Lucas|       Tauá|            Ceará|05-09-1984|   38|28-06-2022 | 15:3...|
|       0005|  Isaias|Pato Branco|Rio Grande do Sul|07-07-2002|   20|28-06-2022 | 15:3...|
|       0004|Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|   58|28-06-2022 | 15:3...|
|       0002|    Igor|   Anápolis|        São Paulo|11-09-1977|   45|28-06-2022 | 15:3...|
|       0003|Leonardo|   Anápolis|        São Paulo|21-12-2000|   22|28-06-2022 | 15:3...|
|       0001|    José|   Anápolis|        São Paulo|01-09-1900|  122|28-06-2022 | 15:3...|
+-----------+--------+-----------+-----------------+----------+-----+--------------------+



In [13]:

# data de execução
df = df.withColumn('data_execucao', F.date_format(F.current_timestamp(), 'dd-MM-yyyy')) # Cria ou manipula
df.show()

+-----------+--------+-----------+-----------------+----------+-----+--------------------+-------------+
|cod_cliente|    nome|  municipio|           estado| data_nasc|idade|    data_atualizacao|data_execucao|
+-----------+--------+-----------+-----------------+----------+-----+--------------------+-------------+
|       0006|   Lucas|       Tauá|            Ceará|05-09-1984|   38|28-06-2022 | 15:3...|   28-06-2022|
|       0004|Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|   58|28-06-2022 | 15:3...|   28-06-2022|
|       0005|  Isaias|Pato Branco|Rio Grande do Sul|07-07-2002|   20|28-06-2022 | 15:3...|   28-06-2022|
|       0001|    José|   Anápolis|        São Paulo|01-09-1900|  122|28-06-2022 | 15:3...|   28-06-2022|
|       0002|    Igor|   Anápolis|        São Paulo|11-09-1977|   45|28-06-2022 | 15:3...|   28-06-2022|
|       0003|Leonardo|   Anápolis|        São Paulo|21-12-2000|   22|28-06-2022 | 15:3...|   28-06-2022|
+-----------+--------+-----------+-----------------+---

In [14]:
# removendo caracteres especiais da coluna estado
df = df.withColumn('estado', F.regexp_replace('estado', 'á', 'a') \
    ).withColumn('estado', F.regexp_replace('estado', 'ã', 'a'))

In [15]:
"""
Convert PySpark Dataframe to Pandas DataFrame
pandasdf = df.toPandas()
print(pandasdf)
"""

'\nConvert PySpark Dataframe to Pandas DataFrame\npandasdf = df.toPandas()\nprint(pandasdf)\n'

In [16]:
df.show(truncate=False)

+-----------+--------+-----------+-----------------+----------+-----+---------------------+-------------+
|cod_cliente|nome    |municipio  |estado           |data_nasc |idade|data_atualizacao     |data_execucao|
+-----------+--------+-----------+-----------------+----------+-----+---------------------+-------------+
|0006       |Lucas   |Tauá       |Ceara            |05-09-1984|38   |28-06-2022 | 15:35:23|28-06-2022   |
|0005       |Isaias  |Pato Branco|Rio Grande do Sul|07-07-2002|20   |28-06-2022 | 15:35:23|28-06-2022   |
|0004       |Humberto|Pato Branco|Rio Grande do Sul|13-11-1964|58   |28-06-2022 | 15:35:23|28-06-2022   |
|0002       |Igor    |Anápolis   |Sao Paulo        |11-09-1977|45   |28-06-2022 | 15:35:23|28-06-2022   |
|0003       |Leonardo|Anápolis   |Sao Paulo        |21-12-2000|22   |28-06-2022 | 15:35:23|28-06-2022   |
|0001       |José    |Anápolis   |Sao Paulo        |01-09-1900|122  |28-06-2022 | 15:35:23|28-06-2022   |
+-----------+--------+-----------+------------