In [1]:
#PROJETO FINAL – Análise de dados com PySpark

#Com o arquivo csv que você pesquisou, execute as seguintes atividades:
#1 – Altere o nome de pelo menos cinco colunas

#2 – Converta o tipo de dados de alguma coluna de sua escolha: Pode ser uma conversão para float, int, DoubleType() ou date.

#3 – Realize pelo menos 5 consultas através da função pyspark.sql ou através de métodos pyspark. Pelo menos duas das consultas deve conter alguma função de agregação: contagem, média, mediana, soma, etc.

#4 – Com alguma das consultas da atividade 3 crie um novo DataFrame e salve-o em formato .csv e em formato .orc.

#Vocês irão desenvolver a atividade em um novo notebook do Colab e depois irão me enviar.

In [2]:
# 01. Importando funções e bibliotecas necessárias.
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

In [3]:
# 02. Criando objeto spark.
spark = SparkSession.builder.appName("tcc_senai1").getOrCreate()

In [4]:
spark

In [5]:
# 03. Importando data frame de suicidios (2010 a 2019) fonte (kaggle).

url = "/content/drive/MyDrive/Colab Notebooks/Trabalho_Nathan/suicidios_2010_a_2019.csv"

suic = spark.read.csv(
    url,
    sep=",",
    header=True,
    inferSchema=True
)

In [6]:
#Analisando a estrutura do dataframe (suic).
suic.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- estado: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- DTOBITO: date (nullable = true)
 |-- DTNASC: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- RACACOR: string (nullable = true)
 |-- ASSISTMED: string (nullable = true)
 |-- ESCMAE: string (nullable = true)
 |-- ESTCIV: string (nullable = true)
 |-- ESC: string (nullable = true)
 |-- OCUP: string (nullable = true)
 |-- CODMUNRES: string (nullable = true)
 |-- CAUSABAS: string (nullable = true)
 |-- CAUSABAS_O: string (nullable = true)
 |-- LOCOCOR: string (nullable = true)
 |-- CIRURGIA: string (nullable = true)



In [7]:
# 04. Alterando o tipo de dado da coluna DTNASC para "int" e depois para "date"
suic = suic.withColumn("DTNASC", suic['DTNASC'].cast("int"))

In [8]:
# Analisando a alteração realizada
suic.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- estado: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- DTOBITO: date (nullable = true)
 |-- DTNASC: integer (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- RACACOR: string (nullable = true)
 |-- ASSISTMED: string (nullable = true)
 |-- ESCMAE: string (nullable = true)
 |-- ESTCIV: string (nullable = true)
 |-- ESC: string (nullable = true)
 |-- OCUP: string (nullable = true)
 |-- CODMUNRES: string (nullable = true)
 |-- CAUSABAS: string (nullable = true)
 |-- CAUSABAS_O: string (nullable = true)
 |-- LOCOCOR: string (nullable = true)
 |-- CIRURGIA: string (nullable = true)



In [9]:
# 05.Alterando o tipo de dado da coluna DTNASC, de "int" para "date"
suic = suic.withColumn("DTNASC",
                f.to_date("DTNASC", "yyyyMMdd")
)

In [10]:
# Analisando a alteração realizada
suic.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- estado: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- DTOBITO: date (nullable = true)
 |-- DTNASC: date (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- RACACOR: string (nullable = true)
 |-- ASSISTMED: string (nullable = true)
 |-- ESCMAE: string (nullable = true)
 |-- ESTCIV: string (nullable = true)
 |-- ESC: string (nullable = true)
 |-- OCUP: string (nullable = true)
 |-- CODMUNRES: string (nullable = true)
 |-- CAUSABAS: string (nullable = true)
 |-- CAUSABAS_O: string (nullable = true)
 |-- LOCOCOR: string (nullable = true)
 |-- CIRURGIA: string (nullable = true)



In [11]:
# Analisando o dataframe antes de alterar retirar colunas desnecessárias.
suic.show(20, truncate=False)

+---+------+----+---+----------+------+---------+-------+---------+------+----------+-----------+----------------------+--------------+--------+----------+-----------+--------+
|_c0|estado|ano |mes|DTOBITO   |DTNASC|SEXO     |RACACOR|ASSISTMED|ESCMAE|ESTCIV    |ESC        |OCUP                  |CODMUNRES     |CAUSABAS|CAUSABAS_O|LOCOCOR    |CIRURGIA|
+---+------+----+---+----------+------+---------+-------+---------+------+----------+-----------+----------------------+--------------+--------+----------+-----------+--------+
|1  |AC    |2010|1  |2010-01-31|NULL  |Masculino|Parda  |NA       |NA    |Solteiro/a|1 a 3 anos |ESTUDANTE             |Tarauacá      |X780    |X780      |Via pública|Não     |
|2  |AC    |2010|2  |2010-02-07|NULL  |Masculino|Parda  |NA       |NA    |Solteiro/a|1 a 3 anos |ESTUDANTE             |Tarauacá      |X780    |X780      |Hospital   |Não     |
|3  |AC    |2010|2  |2010-02-19|NULL  |Masculino|Parda  |NA       |NA    |Solteiro/a|1 a 3 anos |ESTUDANTE         

In [12]:
# Retirando colunas e analisando o resultado
suic.drop("ASSISTMED", "ESCMAE", "CAUSABAS", "CAUSABAS_O").show(10, truncate=False)

+---+------+----+---+----------+------+---------+-------+----------+----------+------------+--------------+-----------+--------+
|_c0|estado|ano |mes|DTOBITO   |DTNASC|SEXO     |RACACOR|ESTCIV    |ESC       |OCUP        |CODMUNRES     |LOCOCOR    |CIRURGIA|
+---+------+----+---+----------+------+---------+-------+----------+----------+------------+--------------+-----------+--------+
|1  |AC    |2010|1  |2010-01-31|NULL  |Masculino|Parda  |Solteiro/a|1 a 3 anos|ESTUDANTE   |Tarauacá      |Via pública|Não     |
|2  |AC    |2010|2  |2010-02-07|NULL  |Masculino|Parda  |Solteiro/a|1 a 3 anos|ESTUDANTE   |Tarauacá      |Hospital   |Não     |
|3  |AC    |2010|2  |2010-02-19|NULL  |Masculino|Parda  |Solteiro/a|1 a 3 anos|ESTUDANTE   |Tarauacá      |Hospital   |Não     |
|4  |AC    |2010|4  |2010-04-20|NULL  |Feminino |Parda  |Casado/a  |1 a 3 anos|DONA DE CASA|Tarauacá      |Domicílio  |Não     |
|5  |AC    |2010|7  |2010-07-24|NULL  |Masculino|Parda  |Casado/a  |NA        |DONA DE CASA|Tarau

In [13]:
# Realizando a alteração (exclusão das colunas = ("ASSISTMED", "ESCMAE", "CAUSABAS", "CAUSABAS_O"))
suic = suic.drop("ASSISTMED", "ESCMAE", "CAUSABAS", "CAUSABAS_O", "CIRURGIA")

In [14]:
# Verificando alteração realizada
suic.show(10, truncate=False)

+---+------+----+---+----------+------+---------+-------+----------+----------+------------+--------------+-----------+
|_c0|estado|ano |mes|DTOBITO   |DTNASC|SEXO     |RACACOR|ESTCIV    |ESC       |OCUP        |CODMUNRES     |LOCOCOR    |
+---+------+----+---+----------+------+---------+-------+----------+----------+------------+--------------+-----------+
|1  |AC    |2010|1  |2010-01-31|NULL  |Masculino|Parda  |Solteiro/a|1 a 3 anos|ESTUDANTE   |Tarauacá      |Via pública|
|2  |AC    |2010|2  |2010-02-07|NULL  |Masculino|Parda  |Solteiro/a|1 a 3 anos|ESTUDANTE   |Tarauacá      |Hospital   |
|3  |AC    |2010|2  |2010-02-19|NULL  |Masculino|Parda  |Solteiro/a|1 a 3 anos|ESTUDANTE   |Tarauacá      |Hospital   |
|4  |AC    |2010|4  |2010-04-20|NULL  |Feminino |Parda  |Casado/a  |1 a 3 anos|DONA DE CASA|Tarauacá      |Domicílio  |
|5  |AC    |2010|7  |2010-07-24|NULL  |Masculino|Parda  |Casado/a  |NA        |DONA DE CASA|Tarauacá      |Domicílio  |
|6  |AC    |2010|4  |2010-04-30|NULL  |F

In [15]:
# Alterando o nome das colunas, analisando o resultado
suic.withColumnsRenamed({
    "_c0" : "codigo",
    "DTOBITO" : "data_do_obito",
    "RACACOR" : "raca_cor",
    "ESTCIV" : "estado_civil",
    "ESC" : "escolaridade",
    "OCUP" : "ocupacao",
    "CODMUNRES" : "municipio",
    "LOCOCOR" : "local_do_obito",
    "SEXO" : "sexo"
}).show(10, truncate=False)

+------+------+----+---+-------------+------+---------+--------+------------+------------+------------+--------------+--------------+
|codigo|estado|ano |mes|data_do_obito|DTNASC|sexo     |raca_cor|estado_civil|escolaridade|ocupacao    |municipio     |local_do_obito|
+------+------+----+---+-------------+------+---------+--------+------------+------------+------------+--------------+--------------+
|1     |AC    |2010|1  |2010-01-31   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Via pública   |
|2     |AC    |2010|2  |2010-02-07   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Hospital      |
|3     |AC    |2010|2  |2010-02-19   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Hospital      |
|4     |AC    |2010|4  |2010-04-20   |NULL  |Feminino |Parda   |Casado/a    |1 a 3 anos  |DONA DE CASA|Tarauacá      |Domicílio     |
|5     |AC    |2010|7  |2010-07-24   |NULL  |Masculino|Parda  

In [16]:
#Realizando a alteração
suic = suic.withColumnsRenamed({
    "_c0" : "codigo",
    "DTOBITO" : "data_do_obito",
    "RACACOR" : "raca_cor",
    "ESTCIV" : "estado_civil",
    "ESC" : "escolaridade",
    "OCUP" : "ocupacao",
    "CODMUNRES" : "municipio",
    "LOCOCOR" : "local_do_obito",
    "SEXO" : "sexo"
})

In [17]:
# Verificando a alteração
suic.show(10, truncate=False)

+------+------+----+---+-------------+------+---------+--------+------------+------------+------------+--------------+--------------+
|codigo|estado|ano |mes|data_do_obito|DTNASC|sexo     |raca_cor|estado_civil|escolaridade|ocupacao    |municipio     |local_do_obito|
+------+------+----+---+-------------+------+---------+--------+------------+------------+------------+--------------+--------------+
|1     |AC    |2010|1  |2010-01-31   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Via pública   |
|2     |AC    |2010|2  |2010-02-07   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Hospital      |
|3     |AC    |2010|2  |2010-02-19   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Hospital      |
|4     |AC    |2010|4  |2010-04-20   |NULL  |Feminino |Parda   |Casado/a    |1 a 3 anos  |DONA DE CASA|Tarauacá      |Domicílio     |
|5     |AC    |2010|7  |2010-07-24   |NULL  |Masculino|Parda  

In [18]:
#Incluindo um segundo dataframe para união com Join

url = "/content/drive/MyDrive/Colab Notebooks/Trabalho_Nathan/suicidios_por_estados.csv"

mort_est = spark.read.csv(
    url,
    sep=",",
    header=True,
    inferSchema=True
)

In [19]:
mort_est.show(10, truncate=False)

+---+------------+---------+--------+----+--------+
|_c0|abrev_estado|estado   |regiao  |n   |pop_ibge|
+---+------------+---------+--------+----+--------+
|1  |RO          |Rondônia |Norte   |995 |1562409 |
|2  |AC          |Acre     |Norte   |508 |733559  |
|3  |AM          |Amazônas |Norte   |2145|3483985 |
|4  |RR          |Roraima  |Norte   |402 |450479  |
|5  |PA          |Pará     |Norte   |2640|7581051 |
|6  |AP          |Amapá    |Norte   |429 |669526  |
|7  |TO          |Tocantins|Norte   |1030|1383445 |
|8  |MA          |Maranhão |Nordeste|2676|6574789 |
|9  |PI          |Piauí    |Nordeste|2710|3118360 |
|10 |CE          |Ceará    |Nordeste|5805|8452381 |
+---+------------+---------+--------+----+--------+
only showing top 10 rows



In [20]:
mort_est = mort_est.drop("estado")

In [21]:
# Unindo os dois dataframes
df_result = suic.join(mort_est, suic.estado == mort_est.abrev_estado, how="inner")

In [22]:
#Verificando o novo dataframe (df_result)
df_result.show(10, truncate=False)

+------+------+----+---+-------------+------+---------+--------+------------+------------+------------+--------------+--------------+---+------------+------+---+--------+
|codigo|estado|ano |mes|data_do_obito|DTNASC|sexo     |raca_cor|estado_civil|escolaridade|ocupacao    |municipio     |local_do_obito|_c0|abrev_estado|regiao|n  |pop_ibge|
+------+------+----+---+-------------+------+---------+--------+------------+------------+------------+--------------+--------------+---+------------+------+---+--------+
|1     |AC    |2010|1  |2010-01-31   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Via pública   |2  |AC          |Norte |508|733559  |
|2     |AC    |2010|2  |2010-02-07   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Hospital      |2  |AC          |Norte |508|733559  |
|3     |AC    |2010|2  |2010-02-19   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE   |Tarauacá      |Hospital      |2  |AC      

In [23]:
df_result = df_result.drop("abrev_estado")

In [24]:
from os import truncate
df_result.show(20, truncate=False)

+------+------+----+---+-------------+------+---------+--------+------------+------------+----------------------+--------------+--------------+---+------+---+--------+
|codigo|estado|ano |mes|data_do_obito|DTNASC|sexo     |raca_cor|estado_civil|escolaridade|ocupacao              |municipio     |local_do_obito|_c0|regiao|n  |pop_ibge|
+------+------+----+---+-------------+------+---------+--------+------------+------------+----------------------+--------------+--------------+---+------+---+--------+
|1     |AC    |2010|1  |2010-01-31   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE             |Tarauacá      |Via pública   |2  |Norte |508|733559  |
|2     |AC    |2010|2  |2010-02-07   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE             |Tarauacá      |Hospital      |2  |Norte |508|733559  |
|3     |AC    |2010|2  |2010-02-19   |NULL  |Masculino|Parda   |Solteiro/a  |1 a 3 anos  |ESTUDANTE             |Tarauacá      |Hospital      |2  |Norte |508|73

In [35]:
#CRIANDO DADOS ESTATISTICOS
df_result\
        .groupBy("estado", "regiao")\
        .agg(
            f.format_number(f.sum("n"), 2).alias("total_obitos"),
            f.median("n").cast("int").alias("media_mensal"),
            f.count_distinct("data_do_obito"). alias("qtd_de_datas"),
            f.stddev("n").alias("desvio_padrao"),
            f.variance("n").alias("Variancia")




        )\
       .orderBy("estado", ascending=False)\
       .toPandas()

Unnamed: 0,estado,regiao,total_obitos,media_mensal,qtd_de_datas,desvio_padrao,Variancia
0,TO,Norte,1060900.0,1030,892,0.0,0.0
1,SP,Sudeste,482680900.0,21970,3645,0.0,0.0
2,SE,Nordeste,1471369.0,1213,1033,0.0,0.0
3,SC,Sul,40309801.0,6349,2961,0.0,0.0
4,RS,Sul,140659600.0,11860,3506,0.0,0.0
5,RR,Norte,161604.0,402,383,0.0,0.0
6,RO,Norte,990025.0,995,866,0.0,0.0
7,RN,Nordeste,3003289.0,1733,1368,0.0,0.0
8,RJ,Sudeste,28869129.0,5373,2774,0.0,0.0
9,PR,Sul,52591504.0,7252,3063,0.0,0.0


In [36]:
# Criando views
suic.createOrReplaceTempView("s_sui")
mort_est.createOrReplaceTempView("s_m_est")

In [47]:
# Criando novo dataframe:

tab_suicidios = spark.sql(
"""
SELECT s.municipio, s.estado, SUM(sme.n) AS total_obitos
FROM s_sui s
INNER JOIN s_m_est sme ON sme.abrev_estado = s.estado
GROUP BY s.municipio, s.estado
"""
)

In [49]:
tab_suicidios.show(50, truncate=False)

+--------------------------+------+------------+
|municipio                 |estado|total_obitos|
+--------------------------+------+------------+
|Russas                    |CE    |336690      |
|Valparaíso de Goiás       |GO    |280539      |
|Itaquiraí                 |MS    |32060       |
|Mar de Espanha            |MG    |81246       |
|Alagoa Nova               |PB    |26117       |
|Cafezal do Sul            |PR    |14504       |
|Santa Maria da Boa Vista  |PE    |78826       |
|Jaguarão                  |RS    |438820      |
|Buritirana                |MA    |16056       |
|Vista Serrana             |PB    |8036        |
|Rondinha                  |RS    |142320      |
|Guajará                   |AM    |10725       |
|Cezarina                  |GO    |8906        |
|Pequi                     |MG    |81246       |
|Senador José Bento        |MG    |27082       |
|Amaral Ferrador           |RS    |106740      |
|Cajati                    |SP    |241670      |
|Dom Basílio        

In [50]:
# Salvando data frame em disco no formato orc.
url = "/content/drive/MyDrive/Colab Notebooks/Saidas"
tab_suicidios.coalesce(1).write.orc(
    url,
    mode = "overwrite"
)

In [51]:
#Salvando data frame em disco no formato csv.
url = "/content/drive/MyDrive/Colab Notebooks/Saidas"
tab_suicidios.coalesce(1).write.csv(
    url,
    mode = "overwrite"
)