# Dados utilizados: viagens à serviço do Governo dos anos 2018-2021 

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pyspark.sql.types
import pyspark.sql.functions

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/04 18:45:08 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/08/04 18:45:08 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/08/04 18:45:08 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/08/04 18:45:08 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [2]:
import time
from pyspark.sql.functions import col

# Lendo dados do Bucket do Cluster na Nuvem do Google

In [3]:
start = time.time()

df = spark.read.csv("gs://bucket-hadoop-daniel/tables/Pagamentos.csv/", sep = ';', header = True)
df.show(10)

end = time.time()
print(end - start)

[Stage 1:>                                                          (0 + 1) / 1]

+-----------------------------------+-------------------------+------------------------+----------------------+-----------------------+---------------------+----------------------------------+--------------------------------+-----------------+------+
|Identificador do processo de viagem|Numero da Proposta (PCDP)|Codigo do orgao superior|Nome do orgao superior|Codigo do orgao pagador|Nome do orgao pagador|Codigo da unidade gestora pagadora|Nome da unidade gestora pagadora|Tipo dE pagamEnto| Valor|
+-----------------------------------+-------------------------+------------------------+----------------------+-----------------------+---------------------+----------------------------------+--------------------------------+-----------------+------+
|                           14046485|           Sem informa�ao|                   26000|  Ministerio da edu...|                  26405| Instituto Federal...|                            158133|            INST.FeD.De eDUC....|          DIARIAS| 537

                                                                                

## Tratando os dados

In [4]:
new_columns = ['id_viagem', 'numero_proposta', 'codigo_orgao_superior', 'nome_orgao_superior',
               'codigo_orgao_pagador', 'nome_orgao_pagador', 'codigo_uni_gestora_pagadora', 'nome_uni_gestora_pagadora',
              'tipo_pagamento', 'valor']
df = df.toDF(*new_columns)
df = df.withColumn('valor',df['valor'].cast('int'))
df = df.withColumn('id_viagem',df['id_viagem'].cast('int'))


## Marcando apenas as colunas que serão utilizadas

In [6]:
df1 = df.select(df['id_viagem'], df['nome_orgao_superior'], df['nome_orgao_pagador'], df['tipo_pagamento'], df['valor'])
df1.show(10)

+---------+--------------------+--------------------+--------------+-----+
|id_viagem| nome_orgao_superior|  nome_orgao_pagador|tipo_pagamento|valor|
+---------+--------------------+--------------------+--------------+-----+
| 14046485|Ministerio da edu...|Instituto Federal...|       DIARIAS|  537|
| 14046485|Ministerio da edu...|Instituto Federal...|      PASSAGEM|   44|
| 14046485|Ministerio da edu...|Instituto Federal...|      PASSAGEM|   54|
| 14112304|Ministerio da edu...|Funda�ao Universi...|       DIARIAS|  487|
| 14166390|Ministerio da edu...|Universidade Tecn...|      PASSAGEM|  665|
| 14166390|Ministerio da edu...|Universidade Tecn...|      PASSAGEM|  518|
| 14201345|Ministerio da edu...|Universidade Fede...|      PASSAGEM|  170|
| 14201345|Ministerio da edu...|Universidade Fede...|      PASSAGEM|  369|
| 14223544|Ministerio das Re...|Ministerio das Re...|      PASSAGEM|  767|
| 14227023|Ministerio da edu...|Universidade Fede...|      PASSAGEM|  562|
+---------+--------------

In [7]:
df1.describe().show()

                                                                                

+-------+--------------------+--------------------+--------------------+--------------------+------------------+
|summary|           id_viagem| nome_orgao_superior|  nome_orgao_pagador|      tipo_pagamento|             valor|
+-------+--------------------+--------------------+--------------------+--------------------+------------------+
|  count|             3327729|             3327729|             2783172|             3327729|           3327729|
|   mean|1.6981290817268774E7|                null|                null|                null|1000.0582322058076|
| stddev| 4.005533268818529E7|                null|                null|                null|1380.4023025000322|
|    min|            14046485|Advocacia-Geral d...|Advocacia-Geral d...|             DIARIAS|                 0|
|    max|          2022000007|            Sigiloso|empresa de Planej...|Taxa dE agEnciamEnto|            308388|
+-------+--------------------+--------------------+--------------------+--------------------+---

Podemos analisar que ao todo são 3.327.729 registros

# Primeira Query: Número de Viagens por Órgão Superior

In [8]:
from pyspark.sql.functions import col

start = time.time()
df_tmp = df1
df_tmp = df_tmp.where(df_tmp.tipo_pagamento == "PASSAGEM").select('*')
df_tmp = df_tmp.select('id_viagem', 'nome_orgao_superior').distinct()
df_tmp = df_tmp.groupby('nome_orgao_superior').count()
df_tmp = df_tmp.select(col('nome_orgao_superior'),col('count').alias('numero_viagens'))

num_viagens_total = df_tmp.sort(col('numero_viagens').desc())
df_tmp = df_tmp.sort(col('numero_viagens').desc()).show()


end= time.time()
print(end-start)



+--------------------+--------------+
| nome_orgao_superior|numero_viagens|
+--------------------+--------------+
|Ministerio da Defesa|        108563|
|            Sigiloso|         72882|
|Ministerio da Edu...|         69258|
|Ministerio da edu...|         57153|
| Ministerio da Saude|         39600|
|Ministerio da Eco...|         24552|
| Ministerio da Sa�de|         22572|
|Ministerio da Inf...|         22451|
|Ministerio do Tra...|         17324|
|Ministerio do Mei...|         15213|
|Ministerio da eco...|         14426|
|Ministerio da Agr...|         12758|
|Ministerio da Jus...|         11147|
|Ministerio da Cid...|          8038|
|Ministerio de Min...|          7858|
|Ministerio da Cie...|          7327|
|Ministerio das Re...|          6848|
|Ministerio do Des...|          6830|
|Presidencia da Re...|          6533|
|Ministerio do Tur...|          6214|
+--------------------+--------------+
only showing top 20 rows

11.19053864479065


                                                                                

# Segunda Query: Valor Gasto em Viagens por Órgão Superior

In [9]:
start = time.time()

df_tmp = df1
df_tmp = df_tmp.groupby('nome_orgao_superior').sum('valor')
df_tmp = df_tmp.select(col('nome_orgao_superior'), col('sum(valor)').alias('valor_viagens'))
valor_gasto = df_tmp.sort(col('valor_viagens').desc())
df_tmp = df_tmp.sort(col('valor_viagens').desc()).show()

# valor_gasto = df1.groupby('nome_orgao_superior').sum()
# valor_gasto = valor_gasto.select(col('nome_orgao_superior'),col('sum(valor)').alias('soma-em-viagens'))
# valor_gasto.sort(col('soma-em-viagens').desc()).show()
# valor_gasto.write.format("csv").save("gs://bucket-hadoop-daniel/")

end = time.time()
print(end-start)



+--------------------+-------------+
| nome_orgao_superior|valor_viagens|
+--------------------+-------------+
|            Sigiloso|    624457233|
|Ministerio da Jus...|    571788020|
|Ministerio da Defesa|    489489641|
|Ministerio da Edu...|    228069959|
|Ministerio da edu...|    183031809|
|Ministerio da Jus...|    159290060|
|Ministerio do Tra...|    124369555|
|Ministerio do Mei...|    109942101|
|Ministerio da Eco...|    103185373|
| Ministerio da Saude|     86289658|
|Ministerio da Inf...|     75119243|
|Ministerio da Agr...|     74980036|
|Ministerio da eco...|     61162319|
|Ministerio das Re...|     58276710|
| Ministerio da Sa�de|     52091275|
|Ministerio da Agr...|     40887076|
|Presidencia da Re...|     32228939|
|Ministerio da Cie...|     30596402|
|Ministerio de Min...|     30555178|
|Ministerio do Des...|     27963089|
+--------------------+-------------+
only showing top 20 rows

23.467045307159424


                                                                                

# Salvando os dados no Bucket 
### Há a opção de salvar dentro do cluster e no HDFS do cluster

In [21]:

valor_gasto.write.format("csv").save("gs://bucket-hadoop-daniel/valor-gasto-resultado")
num_viagens.write.format("csv").save("gs://bucket-hadoop-daniel/num-viagens-total")

                                                                                


# 


# Viagens mais caras

In [10]:
start = time.time()

df_viagem = df1.groupby('id_viagem').sum('valor')
print('============\nTabela - 10 Viagens mais caras\n==========')
df_viagem = df_viagem.sort(col('sum(valor)').desc()).show(10)

end = time.time()
print(end-start)

Tabela - 10 Viagens mais caras




+---------+----------+
|id_viagem|sum(valor)|
+---------+----------+
| 15127123|    308388|
| 15159961|    257222|
| 14621672|    209263|
| 17539937|    201060|
| 16854792|    185473|
| 16826937|    176976|
| 16154465|    166291|
| 17444615|    163919|
| 17275457|    156040|
| 16827160|    154244|
+---------+----------+
only showing top 10 rows

12.643808126449585


                                                                                

# Viagens Internacionais x Viagens Nacionais
## Utilizando a tabela Trechos

In [11]:
#Viagens internacionais vs viagens nacionas

df2 = spark.read.csv("gs://bucket-hadoop-daniel/tables/Trecho-Tables/", sep = ';', header = True)

new_columns = ['id_viagem', 'numero_proposta', 'sequencia_trecho', 'origem_data', 'origem_pais', 'origem_uf', 
               'origem_cidade', 'destino_data', 'destino_pais', 'destino_uf', 'destino_cidade', 
               'meio_transporte', 'numero_diarias', 'missao']

df2 = df2.toDF(*new_columns)
df2 = df2.withColumn('id_viagem',df2['id_viagem'].cast('int'))
df_tr = df2

df_tr = df_tr.select(df_tr['id_viagem'], df_tr['origem_pais'], df_tr['destino_pais'])
df_tr.show(10)


+---------+--------------------+--------------------+
|id_viagem|         origem_pais|        destino_pais|
+---------+--------------------+--------------------+
| 13501576|            Portugal|              Brasil|
| 13501576|              Brasil|            Portugal|
| 14026421|Estados Unidos da...|              Brasil|
| 14026421|              Brasil|Estados Unidos da...|
| 14046485|              Brasil|              Brasil|
| 14046485|              Brasil|              Brasil|
| 14046485|              Brasil|              Brasil|
| 14046485|              Brasil|              Brasil|
| 14108743|              Brasil|            Portugal|
| 14108743|            Portugal|              Brasil|
+---------+--------------------+--------------------+
only showing top 10 rows



# Número de Viagens Nacionais

In [12]:
from pyspark.sql.functions import countDistinct

start = time.time()

df_tr_viagem_nacional = df_tr.where("origem_pais == 'Brasil' AND destino_pais == 'Brasil'").select('*')
#df_tr_viagem_nacional = df_tr_viagem_nacional.groupby('id_viagem').count()
df_tr_viagem_nacional = df_tr_viagem_nacional.select(countDistinct('id_viagem').alias('viagens_nacionais'))
print('==============\nNúmero de viagens Nacionais\n=============')
df_tr_viagem_nacional.show()

end = time.time()
print('Tempo (em segundos): ', end-start)
#df_tr_viagem_nacional.show()

Número de viagens Nacionais




+-----------------+
|viagens_nacionais|
+-----------------+
|          2137809|
+-----------------+

Tempo (em segundos):  11.548349857330322


                                                                                

# Número Viagens Internacionais

In [13]:
start = time.time()


df_tr_viagem_internacional = df_tr.where("origem_pais == 'Brasil' AND destino_pais != 'Brasil'").select('*')
df_tr_viagem_internacional = df_tr_viagem_internacional.select(countDistinct('id_viagem'))
print('==============\nNúmero de viagens Internacionais\n=============')
df_tr_viagem_internacional.show()

end = time.time()
print('Tempo (em segundos): ', end-start)

Número de viagens Internacionais




+-------------------------+
|count(DISTINCT id_viagem)|
+-------------------------+
|                    50574|
+-------------------------+

Tempo (em segundos):  7.292317867279053


                                                                                

# Viagem mais caras e seus destinos
### Join entre as tabelas

df1 = tabela pagamentos com id_viagem, valor

df2 = tabela trechos com id_viagem, origem_pais, destino_pais

In [16]:

start= time.time()

df1.createOrReplaceTempView('pagamentos')
df2.createOrReplaceTempView('trechos')

joinDF12= spark.sql("SELECT p.id_viagem, t.origem_pais, t.destino_pais, SUM(p.valor) AS valor_viagem \
FROM pagamentos p \
INNER JOIN trechos AS t ON (p.id_viagem == t.id_viagem) \
GROUP BY p.id_viagem, t.origem_pais, t.destino_pais ORDER BY valor_viagem DESC LIMIT 10")

print('===============\nTabela Viagem mais caras e seus destinos\n==========')
joinDF12.show(10)


end = time.time()
print('Tempo (em segundos): ', end-start)

Tabela Viagem mais caras e seus destinos


22/08/04 18:57:24 ERROR org.apache.spark.network.client.TransportClient: Failed to send RPC RPC 7441298458465679021 to /10.158.0.7:48134: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadE

+---------+-----------+------------+------------+
|id_viagem|origem_pais|destino_pais|valor_viagem|
+---------+-----------+------------+------------+
| 16826937|     Brasil|      Brasil|     9025776|
| 16827301|     Brasil|      Brasil|     5245584|
| 15819057|     Brasil|      Brasil|     3820080|
| 17274460|     Brasil|      Brasil|     3713588|
| 16827837|     Brasil|      Brasil|     3296232|
| 17275580|     Brasil|      Brasil|     3267720|
| 14587561|     Brasil|      Brasil|     3153064|
| 17265604|     Brasil|      Brasil|     3108160|
| 16816698|     Brasil|      Brasil|     3081351|
| 17265270|     Brasil|      Brasil|     3034584|
+---------+-----------+------------+------------+

Tempo (em segundos):  78.20963644981384


22/08/04 18:58:22 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1659636213380_0001_01_000012 on host: cluster-hadoop-w-1.southamerica-east1-c.c.river-runner-355919.internal. Exit status: 137. Diagnostics: [2022-08-04 18:58:22.122]Container killed on request. Exit code is 137
[2022-08-04 18:58:22.122]Container exited with a non-zero exit code 137. 
[2022-08-04 18:58:22.123]Killed by external signal
.
22/08/04 18:58:22 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 12 for reason Container from a bad node: container_1659636213380_0001_01_000012 on host: cluster-hadoop-w-1.southamerica-east1-c.c.river-runner-355919.internal. Exit status: 137. Diagnostics: [2022-08-04 18:58:22.122]Container killed on request. Exit code is 137
[2022-08-04 18:58:22.122]Container exited with a non-zero exit code 137. 
[2022-08-04 18:58:22.123]Killed by external signal
.
22/08/04 18:58:22 ERROR org.