<a href="https://colab.research.google.com/github/KurlenMurlen/BigDataAssignment2/blob/main/TDE3_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TDE 3 - Spark Implementation

You and your team were hired to process data using Apache Spark. The team have to answer the 10 (ten) questions (tasks) defined in the activity TDE 2, 3, and 4 - Dataset Definition.

Given the aforementioned context, you are in charge of developing a set of solutions that allow the company to answer the following demands:

- (Easy) The first task evolving simple aggregations without the requirement of custom data types;

- (Easy) The second task Evolving simple aggregations without the requirement of custom data types;

- (Easy) The third task Evolving simple aggregations without the requirement of custom data types;

- (Easy) The fourth task Evolving simple aggregations without the requirement of custom data types;

- (Medium) The first task Containing both Custom Keys, Custom Values, and Combining;

- (Medium) The second task Containing both Custom Keys, Custom Values, and Combining;

- (Medium) The third task Containing both Custom Keys, Custom Values, and Combining;

- (Medium) The fourth task Containing both Custom Keys, Custom Values, and Combining;

- (Hard) The first task Containing the use of multiple jobs, Custom Keys, Custom Values, Combining to solve the tasks;

- (Hard) The second task Containing the use of multiple jobs, Custom Keys, Custom Values, Combining to solve the tasks;

Given your knowledge and skills in Python and Apache Spark, for each item above, provide:

- The source code for solving the problem using Apache Spark programming

- The result of your code run in a separate text file (.txt). If more than 5 rows of results are available, you must report only the 5 first rows of such result.

Important:

- The grading of this activity is conditioned to the audit test.

# Group:


- Murilo Chandelier Pedrazzani
- Ricardo Ryu Magalhães Makino
- Tarso Betolini Rodriguês

# Dataset


- Suicidios entre 2010 a 2019

# Columns

0. " "
1. "estado"
2. "ano"
3. "mes"
4. "DTOBITO"
5. "DTNASC"
6. "SEXO"
7. "RACACOR"
8. "ASSISTMED"
9. "ESCMAE"
10. "ESTCIV"
11. "ESC"
12. "OCUP"
13. "CODMUNRES"
14. "CAUSABAS"
15. "CAUSABAS_O"
16. "LOCOCOR"
17. "CIRURGIA"

In [None]:
!pip install -q findspark
!pip install pyspark



In [None]:
import os
import shutil
import findspark
findspark.init()
import csv
from io import StringIO

from pyspark import SparkContext, SparkConf

In [None]:
# Arquivo csv para o dataset
dataset_file = "suicidios_2010_a_2019.csv"

# Função para remover diretórios existentes
def remove_dir(output_dir):
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)

# Função para dividir cada linha usando o leitor de CSV do Python
def split_csv_line(line):
    # Usa o csv.reader para dividir a linha, respeitando as aspas
    reader = csv.reader(StringIO(line), delimiter=",")
    return next(reader)

# Configura e cria o contexto do Spark
def create_spark_context(app_name):
    conf = SparkConf().setAppName(app_name).setMaster("local")
    return SparkContext.getOrCreate(conf)

# Carrega o arquivo e remove o cabeçalho
def load_data(dataset_file):
    rdd = sc.textFile(dataset_file)
    header = rdd.first()
    return rdd.filter(lambda line: line != header and line.strip() != "")

# Função reducer para somar contagens
def reducer_counts(count1, count2):
    return count1 + count2

#**Basics**

In [None]:
# Basics - outputs
output_basic_raceColor = "output_basic_raceColor"
output_basic_gender = "output_basic_gender"
output_basic_states = "output_basic_states"
output_basic_occupationsDistributions = "output_basic_occupationsDistributions"

### Record count by state

In [None]:
# Função mapper para extrair o valor de ESTADO e retornar um par (chave, valor)
def mapper(line):
    columns = line.split(",")
    if len(columns) >= 2:
        estado = columns[1].strip('"')
        return (estado, 1)
    else:
        return ("Invalid", 0)

# Processa o arquivo de entrada e salva o resultado no diretório de saída
def process_file(dataset_file, output_dir):
    global sc  # Para usar o contexto do Spark globalmente
    sc = create_spark_context("ContagemEstado")  # Configura o Spark

    filtered_rdd = load_data(dataset_file)  # Carrega os dados

    # Mapeamento, filtragem e redução
    mapper_rdd = filtered_rdd.map(mapper).filter(lambda x: x[0] != "Invalid")
    reducer_rdd = mapper_rdd.reduceByKey(reducer_counts)

    # Ordena os resultados por valor crescente
    sorted_rdd = reducer_rdd.sortBy(lambda x: x[1], ascending=True)

    # Salva o resultado no diretório de saída
    remove_dir(output_dir)
    sorted_rdd.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_basic_states)

### Record count by Race/Color

In [None]:
# Função mapper para extrair o valor de RACACOR e retornar um par (chave, valor)
def mapper(line):
    # Divide a linha em colunas
    columns = line.split(",")
    if len(columns) >= 7:
        racacor = columns[7].strip('"')
        return (racacor, 1)
    else:
      return ("Invalid", 0)

# Processa o arquivo de entrada e salva o resultado no diretório de saída
def process_file(dataset_file, output_dir):
    global sc  # Para usar o contexto do Spark globalmente
    sc = create_spark_context("CountRaceColor")  # Configura o Spark

    filtered_rdd = load_data(dataset_file)  # Carrega os dados

    mapper_rdd = filtered_rdd.map(mapper).filter(lambda x: x[0] not in [("Invalid", 0), ("Error", 0)])
    reducer_rdd = mapper_rdd.reduceByKey(reducer_counts)

    # Ordena os resultados por valor decrescente
    sorted_rdd = reducer_rdd.sortBy(lambda x: x[1], ascending=False)

    remove_dir(output_dir)
    sorted_rdd.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_basic_raceColor)

### Record count by gender

In [None]:
# Função mapper para extrair o valor de GÊNERO e retornar um par (chave, valor)
def mapper(line):
    columns = line.split(",")
    if len(columns) >= 7:
        gender = columns[6].strip('"')
        return (gender, 1)
    else:
        return ("Invalid", 0)

# Processa o arquivo de entrada e salva o resultado no diretório de saída
def process_file(dataset_file, output_dir):
    global sc  # Para usar o contexto do Spark globalmente
    sc = SparkContext(appName="ContagemSexo")  # Configura o Spark

    # Carrega os dados
    filtered_rdd = sc.textFile(dataset_file)

    # Mapeamento, filtragem e redução
    mapper_rdd = filtered_rdd.map(mapper).filter(lambda x: x[0] != "Invalid")
    reducer_rdd = mapper_rdd.reduceByKey(reducer_counts)

    # Ordena os resultados por valor decrescente
    sorted_rdd = reducer_rdd.sortBy(lambda x: x[1], ascending=False)

    # Salva o resultado no diretório de saída
    remove_dir(output_dir)
    sorted_rdd.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_basic_gender)


### Record count by the distribution of occupations

In [None]:
# Função mapper para extrair o valor de OCUPAÇÃO e retornar um par (chave, valor)
def mapper(line):
    columns = line.split(",")
    if len(columns) >= 12:
        ocup = columns[12].strip('"')
        return (ocup, 12)
    else:
        return ("Invalid", 0)

# Função reducer para somar as ocorrências de cada estado
    sorted_rdd = reducer_rdd.sortBy(lambda x: x[1], ascending=False)

    # Salva o resultado no diretório de saída
    remove_dir(output_dir)
    sorted_rdd.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_basic_occupationsDistributions)

#**Intermediaries**

In [None]:
# intermediaries - outputs
output_intermediary_genderYear = "output_intermediary_genderYear"
output_intermediary_genderState = "output_intermediary_genderState"
output_intermediary_occupationsDistributionsState = "output_intermediary_occupationsDistributionsState"
output_intermediary_dateOfBirthState = "output_intermediary_dateOfBirthState"

### Record count gender by year

In [None]:
# Função mapper para processar cada linha
def mapper(line):
    columns = split_csv_line(line)  # Divide a linha em colunas
    if len(columns) >= 6:  # Verifica se há colunas suficientes
        state = columns[2].strip('"')
        date_of_birth = columns[6].strip('"')
        return ((state, date_of_birth), 1)  # Retorna a chave (estado, data) e contagem 1
    else:
        return (("Invalid", "Invalid"), 0)  # Retorna um par inválido se não houver colunas suficientes

# Processa o arquivo de entrada e salva o resultado no diretório de saída
def process_file(dataset_file, output_dir):
    global sc  # Para usar o contexto do Spark globalmente
    sc = create_spark_context("Count_GenderYea")  # Configura o Spark

    filtered_rdd = load_data(dataset_file)  # Carrega os dados

    mapper_rdd = filtered_rdd.map(mapper).filter(lambda x: x[0] not in [("Invalid", "Invalid"), ("Error", "Error")])
    reducer_rdd = mapper_rdd.reduceByKey(reducer_counts)

    remove_dir(output_dir)
    reducer_rdd.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_intermediary_genderYear)


### Record count gender by state

In [None]:
# Função mapper para processar cada linha
def mapper(line):
    columns = split_csv_line(line)  # Divide a linha em colunas
    if len(columns) >= 6:  # Verifica se há colunas suficientes
        state = columns[1].strip('"')
        date_of_birth = columns[6].strip('"')
        return ((state, date_of_birth), 1)  # Retorna a chave (estado, data) e contagem 1
    else:
        return (("Invalid", "Invalid"), 0)  # Retorna um par inválido se não houver colunas suficientes

# Processa o arquivo de entrada e salva o resultado no diretório de saída
def process_file(dataset_file, output_dir):
    global sc  # Para usar o contexto do Spark globalmente
    sc = create_spark_context("Count_DateOfBirthState")  # Configura o Spark

    filtered_rdd = load_data(dataset_file)  # Carrega os dados

    mapper_rdd = filtered_rdd.map(mapper).filter(lambda x: x[0] not in [("Invalid", "Invalid"), ("Error", "Error")])
    reducer_rdd = mapper_rdd.reduceByKey(reducer_counts)

    remove_dir(output_dir)
    reducer_rdd.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_intermediary_genderState)

### Record count occupations distribuitions by state

In [None]:
# Função auxiliar para dividir a linha em colunas
def split_csv_line(line):
    return line.split(",")

# Função mapper para processar cada linha
def mapper(line):
    columns = split_csv_line(line)  # Divide a linha em colunas
    if len(columns) >= 13:  # Verifica se há colunas suficientes
        estado = columns[1].strip('"')
        ocupacao = columns[12].strip('"')
        return ((estado, ocupacao), 1)  # Retorna a chave (estado, ocupacao) e contagem 1
    else:
        return (("Invalid", "Invalid"), 0)  # Retorna um par inválido se não houver colunas suficientes

# Função principal para processar o arquivo
def process_file(dataset_file, output_dir):
    sc = SparkContext.getOrCreate()

    # Carrega o arquivo, aplica o mapper e reduz por chave
    data = sc.textFile(dataset_file)
    reducer_rdd = (
        data.map(mapper)
        .filter(lambda x: x[0] != ("Invalid", "Invalid"))  # Filtra entradas inválidas
        .reduceByKey(lambda a, b: a + b)
    )

    # Remove o diretório de saída, se necessário
    def remove_dir(directory):
        import shutil
        try:
            shutil.rmtree(directory)
        except FileNotFoundError:
            pass

    remove_dir(output_dir)
    reducer_rdd.saveAsTextFile(output_dir)
    sc.stop()

process_file(dataset_file, output_intermediary_ocupacaoEstado)

### Record count date of birth by state

In [None]:
# Função mapper para processar cada linha
def mapper(line):
    columns = split_csv_line(line)  # Divide a linha em colunas
    if len(columns) >= 6:  # Verifica se há colunas suficientes
        state = columns[1].strip('"')
        date_of_birth = columns[5].strip('"')
        return ((state, date_of_birth), 1)  # Retorna a chave (estado, data) e contagem 1
    else:
        return (("Invalid", "Invalid"), 0)  # Retorna um par inválido se não houver colunas suficientes

# Processa o arquivo de entrada e salva o resultado no diretório de saída
def process_file(dataset_file, output_dir):
    global sc  # Para usar o contexto do Spark globalmente
    sc = create_spark_context("Count_DateOfBirthState")  # Configura o Spark

    filtered_rdd = load_data(dataset_file)  # Carrega os dados

    mapper_rdd = filtered_rdd.map(mapper).filter(lambda x: x[0] not in [("Invalid", "Invalid"), ("Error", "Error")])
    reducer_rdd = mapper_rdd.reduceByKey(reducer_counts)

    remove_dir(output_dir)
    reducer_rdd.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_intermediary_dateOfBirthState)


#**Advanced**

In [None]:
# Advanced - outputs
output_advanced_assistanceCorrelation = "output_advanced_assistanceCorrelation"
output_advanced_occupationCause = "output_advanced_occupationCause"

### Assistance correlation

In [None]:
# Função de mapeamento (corrigido para tratar campos CSV corretamente)
def mapper(line):
    columns = split_csv_line(line)
    # Verifica se o número de colunas é suficiente
    if len(columns) >= 14:
        # Limpa e organiza as colunas necessárias
        codmunres = columns[13].strip('"')
        ano = columns[1].strip('"')
        estciv = columns[10].strip('"')
        assistmed = columns[8].strip('"')

        # Verifica se a assistência médica está presente
        assistance = 1 if assistmed.lower() == "sim" else 0
        return ((codmunres, ano, estciv), (assistance, 1))
    else:
        return (("Invalid", "Invalid", "Invalid"), (0, 0))  # Ignora linhas mal formatadas

# Função de redução
def reducer(value1, value2):
    total_assistencias = value1[0] + value2[0]
    total_ocorrencias = value1[1] + value2[1]
    return (total_assistencias, total_ocorrencias)

# Função para calcular o percentual de assistência
def calculate_percentage(record):
    key, (total_assistencias, total_ocorrencias) = record
    if total_ocorrencias > 0:
        percentual = (total_assistencias / total_ocorrencias) * 100
    else:
        percentual = 0.0
    return (key, f"{percentual:.2f}%")

# Processa o arquivo de entrada e salva o resultado no diretório de saída
def process_file(dataset_file, output_dir):
    global sc  # Para usar o contexto do Spark globalmente
    sc = create_spark_context("AssistenciaMedicaPercentage")  # Configura o Spark

    filtered_rdd = load_data(dataset_file)  # Carrega os dados

    job_mapper = filtered_rdd.map(mapper).filter(lambda x: x[0] not in [("Invalid", "Invalid", "Invalid"), ("Error", "Error", "Error")])
    job_reducer = job_mapper.reduceByKey(reducer)

    # Calcular o percentual de assistência
    percentages = job_reducer.map(calculate_percentage)

    # Remover o diretório de saída, se já existir, e salvar o resultado
    remove_dir(output_dir)
    percentages.saveAsTextFile(output_dir)

    sc.stop()

# Chamada da função com os parâmetros necessários
process_file(dataset_file, output_advanced_assistanceCorrelation)


### Occupation and cause

In [None]:
# Função auxiliar para dividir a linha em colunas
def split_csv_line(line):
    return line.split(",")

# Função de mapeamento para extrair estado, ocupacao e causabas
def mapper(line):
    columns = split_csv_line(line)
    # Verifica se o número de colunas é suficiente
    if len(columns) >= 15:
        # Limpa e organiza as colunas necessárias
        estado = columns[1].strip('"')
        ocupacao = columns[12].strip('"')
        causabas = columns[14].strip('"')
        return ((estado, ocupacao, causabas), 1)  # Retorna a chave (estado, ocupacao, causabas) e contagem 1
    else:
        return (("Invalid", "Invalid", "Invalid"), 0)  # Par inválido se não houver colunas suficientes

# Função principal para processar o arquivo
def process_file(dataset_file, output_dir):
    sc = SparkContext.getOrCreate()

    # Carrega o arquivo, aplica o mapper e reduz por chave
    data = sc.textFile(dataset_file)
    reducer_rdd = (
        data.map(mapper)
        .filter(lambda x: x[0] != ("Invalid", "Invalid", "Invalid"))  # Filtra entradas inválidas
        .reduceByKey(lambda a, b: a + b)
    )

    # Remove o diretório de saída, se necessário
    def remove_dir(directory):
        import shutil
        try:
            shutil.rmtree(directory)
        except FileNotFoundError:
            pass

    remove_dir(output_dir)
    reducer_rdd.saveAsTextFile(output_dir)
    sc.stop()

process_file(dataset_file, output_advanced_ocupacaoDistribuicao)