In [55]:
!pip install pyspark



In [56]:
from pyspark.sql import SparkSession

In [57]:
spark = SparkSession.builder.appName('Lendo CSV').getOrCreate()

In [58]:
caminho_csv = "./base_de_dados.csv"

df = spark.read.csv(
    path=caminho_csv,
    sep=";",
    header=True
)

df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+----------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|  data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+----------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|18/02/2022 13:28|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|08/04/2022 01:47|
|  3|   57.58|    Arthur Goncalves|          

In [59]:
df.schema.fieldNames()

['id',
 'valor',
 'parte_debitada_nome',
 'parte_debitada_conta',
 'parte_debitada_banco',
 'parte_creditada_nome',
 'parte_creditada_conta',
 'parte_creditada_banco',
 'chave_pix_tipo',
 'chave_pix_valor',
 'data_transacao']

In [60]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

In [61]:
schema_pix = StructType([
    StructField("id", IntegerType()),
    StructField("valor", DoubleType()),

])

df = spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_pix
)

df.show()

+---+--------+
| id|   valor|
+---+--------+
|  1|    9.93|
|  2|   15.38|
|  3|   57.58|
|  4|53705.13|
|  5|25299.69|
|  6| 7165.06|
|  7|    6.16|
|  8|  136.36|
|  9|  574.39|
| 10|   42.88|
| 11|33629.97|
| 12| 4374.56|
| 13|  507.18|
| 14|67758.87|
| 15|  815.53|
| 16|    2.73|
| 17|    0.54|
| 18|49836.72|
| 19|    9.68|
| 20| 9837.22|
+---+--------+
only showing top 20 rows


In [62]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = true)



In [63]:
schema_pix = StructType([
    StructField("id", IntegerType()),
    StructField("valor", DoubleType()),
    StructField("parte_debitada_nome", StringType()),
    StructField("parte_debitada_cpf", StringType()),
    StructField("parte_creditada_nome", StringType()),
    StructField("parte_creditada_cpf", StringType()),
    StructField('chave_pix_tipo', StringType()),
])

df = spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_pix,
)

df.show()

+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
| id|   valor| parte_debitada_nome|parte_debitada_cpf|parte_creditada_nome| parte_creditada_cpf|chave_pix_tipo|
+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
|  1|    9.93|Dra. Ana Carolina...|          79470453|              Nubank|       Maysa da Cruz|      67162333|
|  2|   15.38|        Ana Caldeira|          19689668|                Itau|        Evelyn Sales|      60005091|
|  3|   57.58|    Arthur Goncalves|          18856899|            Bradesco|          Maria Melo|      13496303|
|  4|53705.13|  Ana Julia Caldeira|          22834741|                Itau|   Ana Livia Almeida|      44695116|
|  5|25299.69|  Srta. Nicole Pinto|           3715882|              Nubank|Srta. Ana Laura d...|      21409465|
|  6| 7165.06|   Gabriela Ferreira|           2243037|              Nubank|       Larissa Souza|      10

In [64]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_cpf: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_cpf: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)



In [65]:
from pyspark.sql.functions import col

df_cast = df.withColumn('id', col('id').cast('int')).withColumn('valor', col('valor').cast('double'))

In [66]:
df_cast.show()

+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
| id|   valor| parte_debitada_nome|parte_debitada_cpf|parte_creditada_nome| parte_creditada_cpf|chave_pix_tipo|
+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
|  1|    9.93|Dra. Ana Carolina...|          79470453|              Nubank|       Maysa da Cruz|      67162333|
|  2|   15.38|        Ana Caldeira|          19689668|                Itau|        Evelyn Sales|      60005091|
|  3|   57.58|    Arthur Goncalves|          18856899|            Bradesco|          Maria Melo|      13496303|
|  4|53705.13|  Ana Julia Caldeira|          22834741|                Itau|   Ana Livia Almeida|      44695116|
|  5|25299.69|  Srta. Nicole Pinto|           3715882|              Nubank|Srta. Ana Laura d...|      21409465|
|  6| 7165.06|   Gabriela Ferreira|           2243037|              Nubank|       Larissa Souza|      10

### Manipulação de dados II

In [67]:
df_cast.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_cpf: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_cpf: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)



In [68]:
df_cast.select('id', 'valor').show() # seleciona as colunas desejadas

+---+--------+
| id|   valor|
+---+--------+
|  1|    9.93|
|  2|   15.38|
|  3|   57.58|
|  4|53705.13|
|  5|25299.69|
|  6| 7165.06|
|  7|    6.16|
|  8|  136.36|
|  9|  574.39|
| 10|   42.88|
| 11|33629.97|
| 12| 4374.56|
| 13|  507.18|
| 14|67758.87|
| 15|  815.53|
| 16|    2.73|
| 17|    0.54|
| 18|49836.72|
| 19|    9.68|
| 20| 9837.22|
+---+--------+
only showing top 20 rows


In [69]:
from pyspark.sql.functions import round
df_dolar = df_cast.select('id', 'valor').withColumn('valor_dolar', round(col('valor') * 5, 2))

In [70]:
df_dolar.show()

+---+--------+-----------+
| id|   valor|valor_dolar|
+---+--------+-----------+
|  1|    9.93|      49.65|
|  2|   15.38|       76.9|
|  3|   57.58|      287.9|
|  4|53705.13|  268525.65|
|  5|25299.69|  126498.45|
|  6| 7165.06|    35825.3|
|  7|    6.16|       30.8|
|  8|  136.36|      681.8|
|  9|  574.39|    2871.95|
| 10|   42.88|      214.4|
| 11|33629.97|  168149.85|
| 12| 4374.56|    21872.8|
| 13|  507.18|     2535.9|
| 14|67758.87|  338794.35|
| 15|  815.53|    4077.65|
| 16|    2.73|      13.65|
| 17|    0.54|        2.7|
| 18|49836.72|   249183.6|
| 19|    9.68|       48.4|
| 20| 9837.22|    49186.1|
+---+--------+-----------+
only showing top 20 rows


In [71]:
# Dropando colunas
df_dolar.drop('valor_dolar').show()

+---+--------+
| id|   valor|
+---+--------+
|  1|    9.93|
|  2|   15.38|
|  3|   57.58|
|  4|53705.13|
|  5|25299.69|
|  6| 7165.06|
|  7|    6.16|
|  8|  136.36|
|  9|  574.39|
| 10|   42.88|
| 11|33629.97|
| 12| 4374.56|
| 13|  507.18|
| 14|67758.87|
| 15|  815.53|
| 16|    2.73|
| 17|    0.54|
| 18|49836.72|
| 19|    9.68|
| 20| 9837.22|
+---+--------+
only showing top 20 rows


In [72]:
df_dolar.withColumnRenamed('valor_dolar', 'dolar_valor').show()

+---+--------+-----------+
| id|   valor|dolar_valor|
+---+--------+-----------+
|  1|    9.93|      49.65|
|  2|   15.38|       76.9|
|  3|   57.58|      287.9|
|  4|53705.13|  268525.65|
|  5|25299.69|  126498.45|
|  6| 7165.06|    35825.3|
|  7|    6.16|       30.8|
|  8|  136.36|      681.8|
|  9|  574.39|    2871.95|
| 10|   42.88|      214.4|
| 11|33629.97|  168149.85|
| 12| 4374.56|    21872.8|
| 13|  507.18|     2535.9|
| 14|67758.87|  338794.35|
| 15|  815.53|    4077.65|
| 16|    2.73|      13.65|
| 17|    0.54|        2.7|
| 18|49836.72|   249183.6|
| 19|    9.68|       48.4|
| 20| 9837.22|    49186.1|
+---+--------+-----------+
only showing top 20 rows


In [73]:
df_cast.select('id', 'valor').filter(col('valor') > 30000).show()

+---+--------+
| id|   valor|
+---+--------+
|  4|53705.13|
| 11|33629.97|
| 14|67758.87|
| 18|49836.72|
| 27|35859.11|
| 34|58083.62|
| 36|48714.95|
| 47|38219.08|
| 52|60139.23|
| 54|95977.62|
| 55|35409.61|
| 62|57433.69|
| 71|80083.34|
| 78|81977.98|
| 79| 78559.4|
| 83|35095.43|
| 86|94736.79|
| 88|78347.58|
| 97|94586.45|
+---+--------+



### Tratando dados nulos

In [74]:
df_cast.show()

+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
| id|   valor| parte_debitada_nome|parte_debitada_cpf|parte_creditada_nome| parte_creditada_cpf|chave_pix_tipo|
+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
|  1|    9.93|Dra. Ana Carolina...|          79470453|              Nubank|       Maysa da Cruz|      67162333|
|  2|   15.38|        Ana Caldeira|          19689668|                Itau|        Evelyn Sales|      60005091|
|  3|   57.58|    Arthur Goncalves|          18856899|            Bradesco|          Maria Melo|      13496303|
|  4|53705.13|  Ana Julia Caldeira|          22834741|                Itau|   Ana Livia Almeida|      44695116|
|  5|25299.69|  Srta. Nicole Pinto|           3715882|              Nubank|Srta. Ana Laura d...|      21409465|
|  6| 7165.06|   Gabriela Ferreira|           2243037|              Nubank|       Larissa Souza|      10

In [75]:
df_cast.na.drop('all', subset=['id', 'valor']).show() # Exclui a linha toda se tiver um valor nulo

+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
| id|   valor| parte_debitada_nome|parte_debitada_cpf|parte_creditada_nome| parte_creditada_cpf|chave_pix_tipo|
+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
|  1|    9.93|Dra. Ana Carolina...|          79470453|              Nubank|       Maysa da Cruz|      67162333|
|  2|   15.38|        Ana Caldeira|          19689668|                Itau|        Evelyn Sales|      60005091|
|  3|   57.58|    Arthur Goncalves|          18856899|            Bradesco|          Maria Melo|      13496303|
|  4|53705.13|  Ana Julia Caldeira|          22834741|                Itau|   Ana Livia Almeida|      44695116|
|  5|25299.69|  Srta. Nicole Pinto|           3715882|              Nubank|Srta. Ana Laura d...|      21409465|
|  6| 7165.06|   Gabriela Ferreira|           2243037|              Nubank|       Larissa Souza|      10

## Substituindo pelo valor 0

In [76]:
df_cast.na.fill(0, subset=['id', 'valor']).show()

+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
| id|   valor| parte_debitada_nome|parte_debitada_cpf|parte_creditada_nome| parte_creditada_cpf|chave_pix_tipo|
+---+--------+--------------------+------------------+--------------------+--------------------+--------------+
|  1|    9.93|Dra. Ana Carolina...|          79470453|              Nubank|       Maysa da Cruz|      67162333|
|  2|   15.38|        Ana Caldeira|          19689668|                Itau|        Evelyn Sales|      60005091|
|  3|   57.58|    Arthur Goncalves|          18856899|            Bradesco|          Maria Melo|      13496303|
|  4|53705.13|  Ana Julia Caldeira|          22834741|                Itau|   Ana Livia Almeida|      44695116|
|  5|25299.69|  Srta. Nicole Pinto|           3715882|              Nubank|Srta. Ana Laura d...|      21409465|
|  6| 7165.06|   Gabriela Ferreira|           2243037|              Nubank|       Larissa Souza|      10

In [90]:
from pyspark.sql.types import TimestampType
schema_pix = StructType([
    StructField("id", IntegerType()),
    StructField("valor", DoubleType()),
    StructField("parte_debitada_nome", StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField("parte_debitada_banco", StringType()),
    StructField("parte_creditada_nome", StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType()),
])

df = spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_pix,
    timestampFormat='dd/MM/yyyy HH:mm'
)

df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

In [95]:
df_not_null = df.na.fill(0, subset=['valor'])

In [96]:
df_not_null.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

In [97]:
from pyspark.sql.functions import year
df_not_null.select(year(col('data_transacao'))).show()

+--------------------+
|year(data_transacao)|
+--------------------+
|                2022|
|                2022|
|                2022|
|                2022|
|                2022|
|                2022|
|                2021|
|                2021|
|                2021|
|                2022|
|                2022|
|                2022|
|                2021|
|                2021|
|                2022|
|                2021|
|                2022|
|                2022|
|                2022|
|                2021|
+--------------------+
only showing top 20 rows


In [98]:
from pyspark.sql.functions import year
df_2022 = df_not_null.filter(year(col('data_transacao')) == 2022)

In [99]:
df_2022.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

In [101]:
df_2022.withColumn('data_transacao', year(col('data_transacao'))).groupBy('data_transacao').count().show()

+--------------+-----+
|data_transacao|count|
+--------------+-----+
|          2022|   48|
+--------------+-----+



In [102]:
df_2022.write.csv('output/base_2022.csv')

### Particionamento

In [103]:
df.rdd.getNumPartitions()

1

In [104]:
!lscpu

Architecture:                x86_64
  CPU op-mode(s):            32-bit, 64-bit
  Address sizes:             46 bits physical, 48 bits virtual
  Byte Order:                Little Endian
CPU(s):                      2
  On-line CPU(s) list:       0,1
Vendor ID:                   GenuineIntel
  Model name:                Intel(R) Xeon(R) CPU @ 2.20GHz
    CPU family:              6
    Model:                   79
    Thread(s) per core:      2
    Core(s) per socket:      1
    Socket(s):               1
    Stepping:                0
    BogoMIPS:                4399.99
    Flags:                   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pg
                             e mca cmov pat pse36 clflush mmx fxsr sse sse2 ss h
                             t syscall nx pdpe1gb rdtscp lm constant_tsc rep_goo
                             d nopl xtopology nonstop_tsc cpuid tsc_known_freq p
                             ni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2ap
                   

In [106]:
df_2022.write.mode('overwrite').partitionBy('chave_pix_tipo').parquet('output/parquet/base')

In [111]:
from pyspark.sql.functions import date_format
df_final = df.withColumn('data_particao', date_format(col('data_transacao'), 'yyyy-mm-dd'))

In [112]:
df_final.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+-------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|data_particao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+-------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|   2022-28-18|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    2714538

In [114]:
df_final.write.mode('overwrite').partitionBy('data_particao').parquet('output/parquet/base')

In [116]:
df_parquet = spark.read.parquet('./output/parquet/base')

In [118]:
df_parquet.rdd.getNumPartitions()

4

In [119]:
df_parquet_coalesce = df_parquet.coalesce(2)

In [120]:
df_parquet_coalesce.rdd.getNumPartitions()

2

In [124]:
df_final_ano_mes = df.withColumn('data_particao', date_format(col('data_transacao'), 'yyyy-MM'))

In [125]:
df_final_ano_mes.write.mode('overwrite').partitionBy('data_particao').parquet('output/parquet/base')