In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [3]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [4]:
import os

In [5]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [6]:
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [7]:
!pip install -q findspark

In [8]:
import findspark

In [9]:
findspark.init()

In [10]:
from pyspark.sql import *

In [20]:
from pyspark.sql.functions import *

In [21]:
from pyspark.sql.types import *

In [11]:
sc = SparkSession.builder.master('local[*]').getOrCreate()

In [12]:
sc

In [145]:
dataFrame = sc.read.csv("/content/dados/pagamentos.csv", inferSchema=True)

In [146]:
dataFrame = dataFrame.withColumnRenamed("_c0", "Orgao") \
       .withColumnRenamed("_c1", "Orgao_Supervisor") \
       .withColumnRenamed("_c2", "Data") \
       .withColumnRenamed("_c3", "Nr_da_OB") \
       .withColumnRenamed("_c4", "Nr_do_Empenho") \
       .withColumnRenamed("_c5", "Codigo_Favorecido") \
       .withColumnRenamed("_c6", "Nome_Favorecido") \
       .withColumnRenamed("_c7", "Valor_Bruto") \
       .withColumnRenamed("_c8", "Valor_Retido") \
       .withColumnRenamed("_c9", "Valor_Pago")

In [147]:
dataFrame = dataFrame.withColumn("Data", to_date(col("Data"), "dd/MM/yyyy"))

In [148]:
dataFrame = dataFrame.withColumn("Valor_Bruto", regexp_replace(col("Valor_Bruto"), "\\.", "").cast("string")) \
.withColumn("Valor_Retido", regexp_replace(col("Valor_Retido"), "\\.", "").cast("string")) \
.withColumn("Valor_Pago", regexp_replace(col("Valor_Pago"), "\\.", "").cast("string")) \
.withColumn("Valor_Bruto", regexp_replace(col("Valor_Bruto"), ",", ".").cast("double")) \
.withColumn("Valor_Retido", regexp_replace(col("Valor_Retido"), ",", ".").cast("double")) \
.withColumn("Valor_Pago", regexp_replace(col("Valor_Pago"), ",", ".").cast("double"))

In [149]:
dataFrame.printSchema()

root
 |-- Orgao: string (nullable = true)
 |-- Orgao_Supervisor: string (nullable = true)
 |-- Data: date (nullable = true)
 |-- Nr_da_OB: integer (nullable = true)
 |-- Nr_do_Empenho: string (nullable = true)
 |-- Codigo_Favorecido: string (nullable = true)
 |-- Nome_Favorecido: string (nullable = true)
 |-- Valor_Bruto: double (nullable = true)
 |-- Valor_Retido: double (nullable = true)
 |-- Valor_Pago: double (nullable = true)



In [150]:
dataFrame.show(2)

+-----+----------------+----------+--------+-------------+-----------------+--------------------+-----------+------------+----------+
|Orgao|Orgao_Supervisor|      Data|Nr_da_OB|Nr_do_Empenho|Codigo_Favorecido|     Nome_Favorecido|Valor_Bruto|Valor_Retido|Valor_Pago|
+-----+----------------+----------+--------+-------------+-----------------+--------------------+-----------+------------+----------+
|SEFAZ|            null|2018-09-27|    4438| 2018NE000023|      XXX803615XX|ADAO JOSE DOS SANTOS|      100.0|         0.0|     100.0|
|SEFAZ|            null|2018-09-13|    4181| 2018NE000096|      XXX660555XX|ADEMARIO ALVES DE...|    1041.03|         0.0|   1041.03|
+-----+----------------+----------+--------+-------------+-----------------+--------------------+-----------+------------+----------+
only showing top 2 rows



In [18]:
print((dataFrame.count(), len(dataFrame.columns)))

(10032, 10)


In [151]:
dataFrame = dataFrame.orderBy(col("Data"))

In [158]:
dataFrame.show(2)

+-----+----------------+----------+--------+-------------+-----------------+--------------------+-----------+------------+----------+
|Orgao|Orgao_Supervisor|      Data|Nr_da_OB|Nr_do_Empenho|Codigo_Favorecido|     Nome_Favorecido|Valor_Bruto|Valor_Retido|Valor_Pago|
+-----+----------------+----------+--------+-------------+-----------------+--------------------+-----------+------------+----------+
|SEFAZ|            null|2015-01-05|    1352| 2014NE000027|   00394460009289|DELEGACIA DA REC....| 2208808.65|         0.0|2208808.65|
|SEFAZ|            null|2015-01-06|     321| 2014NE000592|   06938508000111|SERGIPE PARQUE TE...|  484217.36|         0.0| 484217.36|
+-----+----------------+----------+--------+-------------+-----------------+--------------------+-----------+------------+----------+
only showing top 2 rows



In [197]:
dataFrame.write.csv("/content/drive/MyDrive/pagamentos_output", mode="overwrite")

In [154]:
dataFrame.createOrReplaceTempView("pagamentos")

In [180]:
dataFrameSQL = sc.sql("SELECT * FROM pagamentos pg where pg.Data <> '2016-02-05' \
and pg.Valor_Bruto > 1041.03 \
and pg.Nome_Favorecido like '%SERGIPE%'")

In [186]:
dataFrameSQL = sc.sql("SELECT * FROM pagamentos pg WHERE pg.Valor_Pago <> (pg.Valor_Bruto - pg.Valor_Retido)")

In [195]:
dataFrameSQL = sc.sql("SELECT * FROM pagamentos WHERE Valor_Pago = (SELECT MAX(Valor_Pago) FROM pagamentos)")

In [193]:
dataFrameSQL = sc.sql("SELECT * FROM pagamentos WHERE Valor_Pago = (SELECT MIN(Valor_Pago) FROM pagamentos)")

In [196]:
dataFrameSQL.show()

+-----+----------------+----------+--------+-------------+-----------------+--------------------+-------------+------------+-------------+
|Orgao|Orgao_Supervisor|      Data|Nr_da_OB|Nr_do_Empenho|Codigo_Favorecido|     Nome_Favorecido|  Valor_Bruto|Valor_Retido|   Valor_Pago|
+-----+----------------+----------+--------+-------------+-----------------+--------------------+-------------+------------+-------------+
|SEFAZ|            null|2023-05-10|    2024| 2023NE000345|   13009717000146|BANCO DO ESTADO D...|3.262580576E7|         0.0|3.262580576E7|
+-----+----------------+----------+--------+-------------+-----------------+--------------------+-------------+------------+-------------+

