In [81]:
from pyspark.sql import SparkSession
from pyspark.sql import types as Types
from pyspark.sql import functions as F
from pyspark.sql.functions import col, unix_timestamp, to_date, date_format
import pandas as pd

# Criando a sessão Spark
spark = SparkSession \
        .builder \
        .appName("Getting Start") \
        .getOrCreate()

Leitura dos dados em formato CSV

In [82]:
# Dados obtidos em http://www.portaltransparencia.gov.br/download-de-dados/viagens
# Ano de 2018

df_viagens = spark.read.csv("data/Viagens 2018/2018_Viagem_UTF8.csv", header=True, sep=';')
df_pagamento = spark.read.csv("data/Viagens 2018/2018_Pagamento_UTF8.csv", header=True, sep=';')
df_trecho = spark.read.csv("data/Viagens 2018/2018_Trecho_UTF8.csv", header=True, sep=';')
df_passagem = spark.read.csv("data/Viagens 2018/2018_Passagem_UTF8.csv", header=True, sep=';')

## Tratamento dos dados
    - Transformar as colunas para os tipos adequados
    - Renomear as colunas para um fácil manuseio dos dados

In [83]:
# Função para converter data do formato "yyyyMMdd" para "dd/MM/yyyy"
def to_date(data_value):
    try:
        return date_format(unix_timestamp(data_value, "yyyyMMdd").cast("timestamp"), format="dd/MM/yyyy")
    except:
        return None

# Função para converter String para Float
def to_float(float_value):
    try:
        return float(float_value.replace(',','.'))
    except:
        return 0.0
    
udf_to_float = F.udf(to_float, Types.FloatType())

# Função para converter String para Boolean
def to_boolean(boolean_value):
    try:
        if boolean_value == 'Não':
            return False
        elif boolean_value == 'Sim':
            return True
    except:
        return None
    
udf_to_boolean = F.udf(to_boolean, Types.BooleanType())

# Função para renomear coluna do DataFrame Spark
def rename_column(df, from_column, to_column):
    df.withColumnRenamed(from_column, to_column)
    
# Remover coluna
def remove_column(df, coluna):
    df.drop(coluna)

## Viagens

In [84]:
df_viagens.limit(1).toPandas()

Unnamed: 0,Identificador do processo de viagem,Situação,Código do órgão superior,Nome do órgão superior,Código órgão solicitante,Nome órgão solicitante,CPF viajante,Nome,Cargo,Período - Data de início,Período - Data de fim,Destinos,Motivo,Valor diárias,Valor passagens,Valor outros gastos
0,13501576,Realizada,36000,MINISTERIO DA SAUDE,36201,FUNDACAO OSWALDO CRUZ,***.637.54*-**,ELIANE CALDAS DO NASCIMENTO OLIVEIRA,PESQUISADOR EM SAUDE PUBLICA,20180210,20180302,Coimbra/Portugal,PARTICIPAÇÃO EM PROJETOS DE PESQUISA DESENVOLV...,",00",",00",",00"


In [85]:
df_viagens.printSchema()

root
 |-- Identificador do processo de viagem: string (nullable = true)
 |-- Situação: string (nullable = true)
 |-- Código do órgão superior: string (nullable = true)
 |-- Nome do órgão superior: string (nullable = true)
 |-- Código órgão solicitante: string (nullable = true)
 |-- Nome órgão solicitante: string (nullable = true)
 |-- CPF viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Período - Data de início: string (nullable = true)
 |-- Período - Data de fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor diárias: string (nullable = true)
 |-- Valor passagens: string (nullable = true)
 |-- Valor outros gastos: string (nullable = true)



In [86]:
# Tratando as colunas "Valor passagens", "Valor diárias", "Valor outros gastos", "Período - Data de início", "Período - Data de fim"
treated_df_viagens = df_viagens.withColumn("ValorPassagens", udf_to_float(df_viagens["Valor passagens"])) \
        .withColumn("ValorDiarias", udf_to_float(df_viagens["Valor diárias"])) \
        .withColumn("ValorOutros", udf_to_float(df_viagens["Valor outros gastos"])) \
        .withColumn("PeriodoDataInicio", to_date(df_viagens["Período - Data de início"])) \
        .withColumn("PeriodoDataFim", to_date(df_viagens["Período - Data de fim"]))

# Excluindo as colunas transformadas
treated_df_viagens = treated_df_viagens.drop("Valor passagens") \
                                       .drop("Valor diárias") \
                                       .drop("Valor outros gastos") \
                                       .drop("Período - Data de início") \
                                       .drop("Período - Data de fim")

# Renomeando as colunas
treated_df_viagens = treated_df_viagens.withColumnRenamed("CPF viajante", "CPFViajante")
treated_df_viagens = treated_df_viagens.withColumnRenamed("Identificador do processo de viagem", "CodViagem")
treated_df_viagens = treated_df_viagens.withColumnRenamed("Situação", "Situacao")
treated_df_viagens = treated_df_viagens.withColumnRenamed("Nome do órgão superior", "OrgaoSuperior")
treated_df_viagens = treated_df_viagens.withColumnRenamed("Código órgão solicitante", "CodOrgaoSolicitante")
treated_df_viagens = treated_df_viagens.withColumnRenamed("Nome órgão solicitante", "NomeOrgaoSolicitante")
treated_df_viagens = treated_df_viagens.withColumnRenamed("Código do órgão superior", "CodOrgaoSup")


treated_df_viagens.printSchema()

root
 |-- CodViagem: string (nullable = true)
 |-- Situacao: string (nullable = true)
 |-- CodOrgaoSup: string (nullable = true)
 |-- OrgaoSuperior: string (nullable = true)
 |-- CodOrgaoSolicitante: string (nullable = true)
 |-- NomeOrgaoSolicitante: string (nullable = true)
 |-- CPFViajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- ValorPassagens: float (nullable = true)
 |-- ValorDiarias: float (nullable = true)
 |-- ValorOutros: float (nullable = true)
 |-- PeriodoDataInicio: string (nullable = true)
 |-- PeriodoDataFim: string (nullable = true)



## Pagamento

In [88]:
df_pagamento.printSchema()

root
 |-- Identificador do processo de viagem: string (nullable = true)
 |-- Código do órgão superior: string (nullable = true)
 |-- Nome do órgão superior: string (nullable = true)
 |-- Codigo do órgão pagador: string (nullable = true)
 |-- Nome do órgao pagador: string (nullable = true)
 |-- Código da unidade gestora pagadora: string (nullable = true)
 |-- Nome da unidade gestora pagadora: string (nullable = true)
 |-- Tipo de pagamento: string (nullable = true)
 |-- Valor: string (nullable = true)



In [89]:
treated_df_pagamento = df_pagamento.withColumn("Valor", udf_to_float(df_pagamento["Valor"]))

# Renomeando as colunas
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Identificador do processo de viagem", "CodViagem")
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Código do órgão superior", "CodOrgaoSup")
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Nome do órgão superior", "OrgaoSuperior")
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Codigo do órgão pagador", "CodOrgaoPagador")
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Nome do órgao pagador", "OrgaoPagador")
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Código da unidade gestora pagadora", "CodUnidPagadora")
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Nome da unidade gestora pagadora", "NomeUnidPagadora")
treated_df_pagamento = treated_df_pagamento.withColumnRenamed("Tipo de pagamento", "TpPagamento")

treated_df_pagamento.printSchema()

root
 |-- CodViagem: string (nullable = true)
 |-- CodOrgaoSup: string (nullable = true)
 |-- OrgaoSuperior: string (nullable = true)
 |-- CodOrgaoPagador: string (nullable = true)
 |-- OrgaoPagador: string (nullable = true)
 |-- CodUnidPagadora: string (nullable = true)
 |-- NomeUnidPagadora: string (nullable = true)
 |-- TpPagamento: string (nullable = true)
 |-- Valor: float (nullable = true)



## Trecho

In [115]:
treated_df_trecho = df_trecho.withColumn("DataDestino", to_date(df_trecho["Destino - Data"])) \
                             .withColumn("DataOrigem", to_date(df_trecho["Origem - Data"])) \
                             .withColumn("NumDiarias", udf_to_float(df_trecho["Número Diárias"])) \
                             .withColumn("Missao?", udf_to_boolean(df_trecho["Missao?"]))

# Renomeando as colunas
treated_df_trecho = treated_df_trecho.withColumnRenamed("Identificador do processo de viagem ", "CodViagem")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Sequência Trecho", "SeqTrecho")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Origem - País", "PaisOrigem")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Origem - UF", "UFOrigem")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Origem - Cidade", "CidadeOrigem")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Destino - Cidade", "CidadeDestino")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Destino - País", "PaisDestino")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Destino - UF", "UFDestino")
treated_df_trecho = treated_df_trecho.withColumnRenamed("Meio de transporte", "MeioTransporte")

# Excluindo as colunas transformadas
treated_df_trecho = treated_df_trecho.drop("Destino - Data") \
                                     .drop("Origem - Data") \
                                     .drop("Número Diárias") \
                                     .drop("Identificador do processo de viagem")  

treated_df_trecho.printSchema()

root
 |-- CodViagem: string (nullable = true)
 |-- SeqTrecho: string (nullable = true)
 |-- PaisOrigem: string (nullable = true)
 |-- UFOrigem: string (nullable = true)
 |-- CidadeOrigem: string (nullable = true)
 |-- PaisDestino: string (nullable = true)
 |-- UFDestino: string (nullable = true)
 |-- CidadeDestino: string (nullable = true)
 |-- MeioTransporte: string (nullable = true)
 |-- Missao?: boolean (nullable = true)
 |-- DataDestino: string (nullable = true)
 |-- DataOrigem: string (nullable = true)
 |-- NumDiarias: float (nullable = true)



## Passagem

In [116]:
treated_df_passagem = df_passagem.withColumn("ValorPassagem", udf_to_float(df_passagem["Valor da passagem"])) \
                                 .withColumn("TaxaServico", udf_to_float(df_passagem["Taxa de serviço"]))

# Renomeando as colunas
treated_df_passagem = treated_df_passagem.withColumnRenamed("Identificador do processo de viagem", "CodViagem")
treated_df_passagem = treated_df_passagem.withColumnRenamed("Meio de transporte", "MeioTransporte")
treated_df_passagem = treated_df_passagem.withColumnRenamed("País - Origem ida", "PaisOrigemIda")
treated_df_passagem = treated_df_passagem.withColumnRenamed("UF - Origem ida", "UFOrigemIda")
treated_df_passagem = treated_df_passagem.withColumnRenamed("Cidade - Origem ida", "CidadeOrigemIda")
treated_df_passagem = treated_df_passagem.withColumnRenamed("País - Destino ida", "PaisDestinoIda")
treated_df_passagem = treated_df_passagem.withColumnRenamed("UF - Destino ida", "UFDestinoIda")
treated_df_passagem = treated_df_passagem.withColumnRenamed("Cidade - Destino ida", "CidadeDestinoIda")
treated_df_passagem = treated_df_passagem.withColumnRenamed("País - Origem volta", "PaisOrigemVolta")
treated_df_passagem = treated_df_passagem.withColumnRenamed("UF - Origem volta", "UFOrigemVolta")
treated_df_passagem = treated_df_passagem.withColumnRenamed("Cidade - Origem volta", "CidadeOrigemVolta")
treated_df_passagem = treated_df_passagem.withColumnRenamed("Pais - Destino volta", "PaisDestinoVolta")
treated_df_passagem = treated_df_passagem.withColumnRenamed("UF - Destino volta", "UFDestinoVolta")
treated_df_passagem = treated_df_passagem.withColumnRenamed("Cidade - Destino volta", "CidadeDestinoVolta")

# Excluindo as colunas transformadas
treated_df_passagem = treated_df_passagem.drop("Valor da passagem") \
                                         .drop("Taxa de serviço")
treated_df_passagem.printSchema()

root
 |-- CodViagem: string (nullable = true)
 |-- MeioTransporte: string (nullable = true)
 |-- PaisOrigemIda: string (nullable = true)
 |-- UFOrigemIda: string (nullable = true)
 |-- CidadeOrigemIda: string (nullable = true)
 |-- PaisDestinoIda: string (nullable = true)
 |-- UFDestinoIda: string (nullable = true)
 |-- CidadeDestinoIda: string (nullable = true)
 |-- PaisOrigemVolta: string (nullable = true)
 |-- UFOrigemVolta: string (nullable = true)
 |-- CidadeOrigemVolta: string (nullable = true)
 |-- PaisDestinoVolta: string (nullable = true)
 |-- UFDestinoVolta: string (nullable = true)
 |-- CidadeDestinoVolta: string (nullable = true)
 |-- ValorPassagem: float (nullable = true)
 |-- TaxaServico: float (nullable = true)



## Spark SQL

In [117]:
# Viagens
treated_df_viagens.createOrReplaceTempView("Viagens")

# Pagamento
treated_df_pagamento.createOrReplaceTempView("Pagamento")

# Trecho
treated_df_trecho.createOrReplaceTempView("Trecho")

# Passagem
treated_df_passagem.createOrReplaceTempView("Passagem")

In [118]:
treated_df_viagens.limit(2).toPandas()

Unnamed: 0,CodViagem,Situacao,CodOrgaoSup,OrgaoSuperior,CodOrgaoSolicitante,NomeOrgaoSolicitante,CPFViajante,Nome,Cargo,Destinos,Motivo,ValorPassagens,ValorDiarias,ValorOutros,PeriodoDataInicio,PeriodoDataFim
0,13501576,Realizada,36000,MINISTERIO DA SAUDE,36201,FUNDACAO OSWALDO CRUZ,***.637.54*-**,ELIANE CALDAS DO NASCIMENTO OLIVEIRA,PESQUISADOR EM SAUDE PUBLICA,Coimbra/Portugal,PARTICIPAÇÃO EM PROJETOS DE PESQUISA DESENVOLV...,0.0,0.0,0.0,10/02/2018,02/03/2018
1,14026421,Realizada,26000,MINISTERIO DA EDUCACAO,26352,FUNDACAO UNIVERSIDADE FEDERAL DO ABC,***.199.59*-**,ALYSSON FABIO FERRARI,PROFESSOR DO MAGISTERIO SUPERIOR,Bloomington - Indiana/Estados Unidos da América,Estágio pós-doutoral no exterior.,0.0,0.0,0.0,09/01/2018,19/11/2018


In [119]:
treated_df_pagamento.limit(2).toPandas()

Unnamed: 0,CodViagem,CodOrgaoSup,OrgaoSuperior,CodOrgaoPagador,OrgaoPagador,CodUnidPagadora,NomeUnidPagadora,TpPagamento,Valor
0,14046485,26000,MINISTERIO DA EDUCACAO,26405,"INST.FED.DE EDUC.,CIENC.E TEC.DO CEARA",158133,"INST.FED.DE EDUC.,CIENC.E TEC.DO CEARA",DIÁRIAS,537.5
1,14046485,26000,MINISTERIO DA EDUCACAO,26405,"INST.FED.DE EDUC.,CIENC.E TEC.DO CEARA",158133,"INST.FED.DE EDUC.,CIENC.E TEC.DO CEARA",PASSAGEM,54.459999


In [120]:
treated_df_trecho.limit(2).toPandas()

Unnamed: 0,CodViagem,SeqTrecho,PaisOrigem,UFOrigem,CidadeOrigem,PaisDestino,UFDestino,CidadeDestino,MeioTransporte,Missao?,DataDestino,DataOrigem,NumDiarias
0,13501576,2,Portugal,,Coimbra,Brasil,Rio de Janeiro,Rio de Janeiro,Aéreo,False,02/03/2018,02/03/2018,0.0
1,13501576,1,Brasil,Rio de Janeiro,Rio de Janeiro,Portugal,,Coimbra,Aéreo,True,02/03/2018,10/02/2018,0.0


In [121]:
treated_df_passagem.limit(2).toPandas()

Unnamed: 0,CodViagem,MeioTransporte,PaisOrigemIda,UFOrigemIda,CidadeOrigemIda,PaisDestinoIda,UFDestinoIda,CidadeDestinoIda,PaisOrigemVolta,UFOrigemVolta,CidadeOrigemVolta,PaisDestinoVolta,UFDestinoVolta,CidadeDestinoVolta,ValorPassagem,TaxaServico
0,14046485,Rodoviário,Brasil,Ceará,Fortaleza,Brasil,Ceará,Sobral,Sem Informação,Sem Informação,Sem Informação,Sem Informação,Sem Informação,Sem Informação,40.950001,13.51
1,14046485,Rodoviário,Brasil,Ceará,Sobral,Brasil,Ceará,Fortaleza,Sem Informação,Sem Informação,Sem Informação,Sem Informação,Sem Informação,Sem Informação,33.25,10.97


## Separando os dados por data

In [123]:
sqlDF = spark.sql("SELECT * FROM VIAGENS V JOIN PAGAMENTO P ON V.CODVIAGEM = P.CODVIAGEM LIMIT 3")

sqlDF.toPandas()

Unnamed: 0,CodViagem,Situacao,CodOrgaoSup,OrgaoSuperior,CodOrgaoSolicitante,NomeOrgaoSolicitante,CPFViajante,Nome,Cargo,Destinos,...,PeriodoDataFim,CodViagem.1,CodOrgaoSup.1,OrgaoSuperior.1,CodOrgaoPagador,OrgaoPagador,CodUnidPagadora,NomeUnidPagadora,TpPagamento,Valor
0,14467333,Realizada,26000,MINISTERIO DA EDUCACAO,26249,UNIVERSIDADE FEDERAL RURAL DO RIO DE JANEIRO,***.574.51*-**,PAULO CEZAR DA CUNHA JUNIOR,,Rio Branco/AC,...,11/02/2018,14467333,26000,MINISTERIO DA EDUCACAO,26249,UNIVERSIDADE FEDERAL RURAL DO RIO DE JANEIRO,153166,UNIVERSIDADE FEDERAL RURAL DO RIO DE JANEIRO,PASSAGEM,631.580017
1,14467333,Realizada,26000,MINISTERIO DA EDUCACAO,26249,UNIVERSIDADE FEDERAL RURAL DO RIO DE JANEIRO,***.574.51*-**,PAULO CEZAR DA CUNHA JUNIOR,,Rio Branco/AC,...,11/02/2018,14467333,26000,MINISTERIO DA EDUCACAO,26249,UNIVERSIDADE FEDERAL RURAL DO RIO DE JANEIRO,153166,UNIVERSIDADE FEDERAL RURAL DO RIO DE JANEIRO,PASSAGEM,864.309998
2,14515854,Realizada,24000,"MINIST.DA CIENCIA,TECNOL.,INOV.E COMUNICACOES",24000,"MINIST.DA CIENCIA,TECNOL.,INOV.E COMUNICACOES ...",***.232.32*-**,MARCEL FRAJBLAT,PROFESSOR DO MAGISTERIO SUPERIOR,Brasília/DF,...,01/03/2018,14515854,24000,"MINIST.DA CIENCIA,TECNOL.,INOV.E COMUNICACOES",24000,"MINIST.DA CIENCIA,TECNOL.,INOV.E COMUNICACOES ...",240101,COORDENACAO-GERAL DE RECURSOS LOGISTICOS,PASSAGEM,220.929993
