In [5]:
#Criando ambiente Spark para analise de dados
#Instalando bibliotecas necerssarias:
!pip install pyspark
!pip install findspark

#Importando bibliotecas
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

#Criando (instanciando) objeto spark
#app.Name(nome na memoria) getOrCreate cria ambiente cas não exista
spark = SparkSession.builder.appName('SENAI').getOrCreate()



In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
gastos = spark.read.options(header = True, encoding = 'ISO-8859-1').csv(
    "/content/drive/MyDrive/BigData/202301_Transferencias.csv",
    sep = ";",
    inferSchema=True
)

In [7]:
from IPython.utils.sysinfo import encoding
recebidos = spark.read.options(header = True, encoding = 'ISO-8859-1').csv(
    '/content/drive/MyDrive/BigData/202301_RecebimentosRecursosPorFavorecido.csv',
    sep = ";",
    inferSchema=True
)

In [8]:
gastos.printSchema()

root
 |-- ANO / MÊS: integer (nullable = true)
 |-- TIPO TRANSFERÊNCIA: string (nullable = true)
 |-- TIPO FAVORECIDO: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- CÓDIGO MUNICÍPIO SIAFI: integer (nullable = true)
 |-- NOME MUNICÍPIO: string (nullable = true)
 |-- CÓDIGO ÓRGÃO SIAFI: integer (nullable = true)
 |-- NOME ÓRGÃO: string (nullable = true)
 |-- CÓDIGO UNIDADE GESTORA: integer (nullable = true)
 |-- NOME UNIDADE GESTORA: string (nullable = true)
 |-- CÓDIGO FUNÇÃO: string (nullable = true)
 |-- NOME FUNÇÃO: string (nullable = true)
 |-- CÓDIGO SUBFUNÇÃO: string (nullable = true)
 |-- NOME SUBFUNÇÃO: string (nullable = true)
 |-- CÓDIGO PROGRAMA: string (nullable = true)
 |-- NOME PROGRAMA: string (nullable = true)
 |-- AÇÃO: string (nullable = true)
 |-- NOME AÇÃO: string (nullable = true)
 |-- LINGUAGEM CIDADÃ: string (nullable = true)
 |-- CÓDIGO GRUPO DESPESA: string (nullable = true)
 |-- NOME GRUPO DESPESA: string (nullable = true)
 |-- CÓDIGO MODALIDA

In [9]:
#RENOMEANDO COLUNAS DO DATAFRAME
gastos= gastos.withColumnsRenamed({
    'UF':'ESTADO',
    'NOME MUNICÍPIO':'MUNICIPIO',
    'NOME RECEBEDOR':'RECEBEDOR',
    'VALOR TRANSFERIDO':'VALOR',
    'TIPO FAVORECIDO':'TIPO_FAVORECIDO',
    'NOME FUNÇÃO':'NOME_FUNCAO',
    'NOME ÓRGÃO':'NOME_ORGAO',
    'NOME FAVORECIDO':'NOME_FAVORECIDO',
    'CÓDIGO FAVORECIDO':'COD_FAVORECIDO'
})

In [10]:
#RENOMEANDO COLUNAS DO DATAFRAME
recebidos = recebidos.withColumnsRenamed({
    'Código Favorecido':'COD_FAVORECIDO',
    'Valor Recebido':'Valor_Recebido'
})

In [11]:
gastos = gastos.na.fill(0)
gastos = gastos.na.fill('N/A')

In [12]:
recebidos = recebidos.na.fill(0)
recebidos = recebidos.na.fill('N/A')

In [13]:
recebidos = recebidos.withColumn('Valor_Recebido',f.regexp_replace('Valor_Recebido',',','.'))

In [14]:
recebidos.withColumn('Valor_Recebido',recebidos.Valor_Recebido.cast('float'))

DataFrame[COD_FAVORECIDO: string, Nome Favorecido: string, Sigla UF: string, Nome Município: string, Código Órgão Superior: int, Nome Órgão Superior: string, Código Órgão: int, Nome Órgão: string, Código Unidade Gestora: int, Nome Unidade Gestora: string, Ano e mês do lançamento: string, Valor_Recebido: float]

In [15]:
gastos = gastos.withColumn('VALOR', f.regexp_replace('VALOR',',','.') )

In [16]:
gastos.withColumn('VALOR',gastos.VALOR.cast('float'))

DataFrame[ANO / MÊS: int, TIPO TRANSFERÊNCIA: string, TIPO_FAVORECIDO: string, ESTADO: string, CÓDIGO MUNICÍPIO SIAFI: int, MUNICIPIO: string, CÓDIGO ÓRGÃO SIAFI: int, NOME_ORGAO: string, CÓDIGO UNIDADE GESTORA: int, NOME UNIDADE GESTORA: string, CÓDIGO FUNÇÃO: string, NOME_FUNCAO: string, CÓDIGO SUBFUNÇÃO: string, NOME SUBFUNÇÃO: string, CÓDIGO PROGRAMA: string, NOME PROGRAMA: string, AÇÃO: string, NOME AÇÃO: string, LINGUAGEM CIDADÃ: string, CÓDIGO GRUPO DESPESA: string, NOME GRUPO DESPESA: string, CÓDIGO MODALIDADE APLICAÇÃO DESPESA: string, NOME MODALIDADE APLICAÇÃO DESPESA: string, CÓDIGO ELEMENTO DESPESA: string, NOME ELEMENTO DESPESA: string, CÓDIGO PLANO ORÇAMENTÁRIO: string, NOME PLANO ORÇAMENTÁRIO: string, CÓDIGO SUBTÍTULO: string, NOME SUBTÍTULO: string, CÓDIGO LOCALIZADOR: int, NOME LOCALIZADOR: string, SIGLA LOCALIZADOR: string, DESCRIÇÃO COMPLEMENTAR LOCALIZADOR: string, COD_FAVORECIDO: string, NOME_FAVORECIDO: string, VALOR: float]

In [17]:
gastos.createOrReplaceTempView('V_GASTOS')

In [18]:
recebidos.createOrReplaceTempView('V_RECEBIDOS')

In [19]:
#CONSULTA DE VALOR TRANSFERIDO POR ESTADO NO MES 01/2023
Valor_Estado = spark.sql(
"""
SELECT  G.ESTADO,
        SUM(G.VALOR) AS SOMA_TRANSFERENCIA
FROM V_GASTOS AS G
GROUP BY G.ESTADO
ORDER BY SOMA_TRANSFERENCIA DESC
"""
)

In [20]:
#CONSULTA DE VALOR TRANSFERIDO POR MUNICIPIO NO MES 01/2023
transf_municipio = spark.sql(
"""
SELECT  G.MUNICIPIO,
        G.ESTADO,
        AVG(G.VALOR),
        SUM(G.VALOR) AS SOMA_TRANSFERENCIA
FROM V_GASTOS AS G
WHERE G.MUNICIPIO != 'N/A'
GROUP BY G.ESTADO, G.MUNICIPIO
ORDER BY SOMA_TRANSFERENCIA DESC
"""
)

In [21]:
#CONSULTA DE VALOR TRANSFERIDO POR TIPO DE FAVORECIDO NO MES 01/2023
spark.sql(
"""
SELECT  G.TIPO_FAVORECIDO,
        SUM(G.VALOR) AS SOMA_TRANSFERENCIA
FROM V_GASTOS AS G
GROUP BY G.TIPO_FAVORECIDO
ORDER BY SOMA_TRANSFERENCIA DESC
"""
).show(truncate=False)

+-----------------------------------------------------+---------------------+
|TIPO_FAVORECIDO                                      |SOMA_TRANSFERENCIA   |
+-----------------------------------------------------+---------------------+
|Administração Pública Municipal                      |2.1359576920899853E10|
|Administração Pública Estadual ou do Distrito Federal|1.361686762907E10    |
|Entidades Empresariais Privadas                      |6.823324857300006E9  |
|Fundo Público                                        |6.329090797489968E9  |
|Entidades Sem Fins Lucrativos                        |7.208534231499996E8  |
|Agentes Intermediários                               |4.296433734E8        |
|Organizações Internacionais                          |5.5467075E7          |
|Administração Pública                                |9090742.26           |
|Sem Informação                                       |3661643.48           |
+-----------------------------------------------------+---------

In [22]:
#CONSULTA DE ORGÃO QUE MAIS RECEBEU TRANSFERÊNCIA ENTRE AS "Entidades Empresariais Privadas" NO MES 01/2023
Empresas_Mais_Recebido = spark.sql(
"""
SELECT  G.TIPO_FAVORECIDO,
        G.NOME_FAVORECIDO,
        SUM(G.VALOR) AS SOMA_TRANSFERENCIA
FROM V_GASTOS AS G
WHERE G.TIPO_FAVORECIDO LIKE "Entidades Empresariais Privadas"
GROUP BY G.NOME_FAVORECIDO, G.TIPO_FAVORECIDO
ORDER BY SOMA_TRANSFERENCIA DESC
"""
)

In [23]:
#CONSULTA FAVORECIDOS DE MAIOR VALOR RECEBIDO NO MES 01/2023
spark.sql(
"""
SELECT  G.NOME_FAVORECIDO,
        G.NOME_FUNCAO,
        G.TIPO_FAVORECIDO,
        SUM(RECEB.Valor_Recebido) AS SOMA_RECEBIDO
FROM V_GASTOS AS G
INNER JOIN V_RECEBIDOS AS RECEB ON (RECEB.COD_FAVORECIDO = G.COD_FAVORECIDO)
GROUP BY G.NOME_FAVORECIDO,G.NOME_FUNCAO,G.TIPO_FAVORECIDO
ORDER BY SOMA_RECEBIDO DESC
"""
).toPandas()

Unnamed: 0,NOME_FAVORECIDO,NOME_FUNCAO,TIPO_FAVORECIDO,SOMA_RECEBIDO
0,BANCO DO BRASIL SA,Encargos especiais,Entidades Empresariais Privadas,1.519535e+11
1,COORDENACAO-GERAL DE TESOURARIA - CGTES,Encargos especiais,Agentes Intermediários,1.344653e+11
2,BANCO DO BRASIL SA,Educação,Entidades Empresariais Privadas,1.095602e+11
3,FUNDO MUNICIPAL DE SAUDE,Saúde,Fundo Público,1.931977e+10
4,BANCO DO BRASIL SA,Energia,Entidades Empresariais Privadas,1.381395e+10
...,...,...,...,...
11764,IRMANDADE DA SANTA CASA DE MISERICORDIA DE SAO...,Saúde,Entidades Sem Fins Lucrativos,0.000000e+00
11765,ESTADO DO RIO GRANDE DO SUL,Encargos especiais,Administração Pública Estadual ou do Distrito ...,-1.410000e+03
11766,MUNICIPIO DE DESTERRO,Educação,Administração Pública Municipal,-1.802911e+04
11767,MUNICIPIO DE DESTERRO,Encargos especiais,Administração Pública Municipal,-9.014555e+04


In [24]:
#Salvando DataSet em formato ORC:
transf_municipio.coalesce(1).write.orc(
    '/content/drive/MyDrive/Colab Notebooks/Saidas',
    mode='overwrite'
)

In [25]:
#Salvando DataSet em formato CSV:
Valor_Estado.coalesce(1).write.mode("overwrite").csv(
    '/content/drive/MyDrive/Colab Notebooks/Saidas'
    )

In [26]:
#Salvando DataSet em formato ORC:
Empresas_Mais_Recebido.coalesce(1).write.orc(
    '/content/drive/MyDrive/Colab Notebooks/Saidas',
    mode ='overwrite'
)