<a href="https://colab.research.google.com/github/bruno-teider/BigData/blob/main/BigDataTDE3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [1]:
import pyspark
from pyspark.sql import SparkSession
from google.colab import drive

# Get the file from google Drive
drive.mount('/content/drive', force_remount=True)
path = "/content/drive/My Drive/Colab Notebooks/Big Data/"

spark = SparkSession.builder.appName("TDE2").master("local[*]").getOrCreate()

sc = spark.sparkContext

# Getting the data from the file
rdd = sc.textFile(path+"censo_escolar_2021.csv")

# Removing the header
rdd = rdd.filter(lambda x: x.split(";")[0] != "NU_ANO_CENSO")

# Indexes
NO_REGIAO = 1       # Region Name
NO_MUNICIPIO = 6    # City Name
NO_ENTIDADE = 14    # Entity Name
QT_MAT_MED = 317    # Quantity High Scholl enrollments
QT_DOC_BAS = 338    # Quantity Basic Teachers
QT_DOC_FUND = 342   # Quantity Fundamental Teachers
QT_DOC_MED = 345    # Quantity High School Teachers
TP_LOCALIZACAO = 17  # Location type
TP_DEPENDENCIA = 15  # Administrative dependency
SG_UF = 4            # State (UF)

Mounted at /content/drive


In [2]:
# Activity 1: Number of schools in Curitiba

# Gets the schools in Curitiba
curitibaRDD = rdd.filter(lambda x: x.split(";")[NO_MUNICIPIO] == "Curitiba")

# Do a map to add the value 1 to each
curitibaMap = curitibaRDD.map(lambda x: (x.split(";")[NO_MUNICIPIO], 1))

# Do a reduce to make the sum of the keys
curitibaReduce = curitibaMap.reduceByKey(lambda x, y: x + y)

# Save the result
curitibaReduce.coalesce(1).saveAsTextFile("Atividade1.txt")

curitibaReduce.collect()

[('Curitiba', 1181)]

In [3]:
# Activity 2: Number of schools per NO_REGIAO, sorted alphabetically by the name of region

# Map returning the name of region and 1
atv2Map = rdd.map(lambda x: (x.split(";")[NO_REGIAO], 1))

# Reduce to sum the values per region
atv2Reduce = atv2Map.reduceByKey(lambda x, y: x + y)

# Sort alphabetically
atv2Sorted = atv2Reduce.sortByKey()

# Save the result
atv2Sorted.coalesce(1).saveAsTextFile("Atividade2.txt")

atv2Sorted.collect()

[('Centro-Oeste', 11659),
 ('Nordeste', 79039),
 ('Norte', 26095),
 ('Sudeste', 75329),
 ('Sul', 29018)]

In [4]:
# Activity 3: NO_ENTIDADE, NO_MUNICIPIO and quantity of teachers that the school
# that has the biggest teacher sum in QT_DOC_BAS, QT_DOC_FUND, QT_DOC_MED

# Func to exclude null values
def isValid(value):
  return int(value) if value.isdigit() else 0

# Get the specific values to key and value
# Key = (school, city), Value = (quantity each type of teacher)
act3map = rdd.map(lambda x: (
    (x.split(";")[NO_ENTIDADE], x.split(";")[NO_MUNICIPIO]),
    (
        isValid(x.split(";")[QT_DOC_BAS]),
        isValid(x.split(";")[QT_DOC_FUND]),
        isValid(x.split(";")[QT_DOC_MED])
    )
))

# map tha sum Values
act3map_with_sum = act3map.map(lambda x: (x[0], sum(x[1])))

# Reduce to get the biggest value
max_teachers = act3map_with_sum.reduce(lambda x, y: x if x[1] > y[1] else y)

# To save the result in a text, we have to transform it to a RDD
max_teachersRDD = spark.sparkContext.parallelize([max_teachers])
max_teachersRDD.saveAsTextFile("Atividade3.txt")

print(max_teachers)

(('EDUCACAO INFANTIL MUNICIPAL BALAO MAGICO', 'Bom Jardim de Minas'), 623)


In [5]:
# Activity 4: Average number of high school enrollments (QT_MAT_MED) in relation to the total number of schools by Geographic Region (NO_REGIAO).

# FIlter to get only schools with High School enrollments
highSchools = rdd.filter(lambda x: x.split(";")[QT_MAT_MED] != '' and int(x.split(";")[QT_MAT_MED]) > 0)

# Map: Key = Region, Value = (Enrollments, 1)
actv4map = highSchools.map(lambda x: ((x.split(";")[NO_REGIAO]), (int(x.split(";")[QT_MAT_MED]), 1)))

# Sum the values of enrollments and number of schools per region
actv4reduce = actv4map.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Calculate the average of them
avg = actv4reduce.map(lambda x: (x[0], x[1][0] / x[1][1]))

# Save the result
avg.coalesce(1).saveAsTextFile("Atividade4.txt")

avg.collect()

[('Norte', 32.57775020678246),
 ('Sul', 155.18777275251924),
 ('Centro-Oeste', 84.54959316098466),
 ('Nordeste', 33.084404898584005),
 ('Sudeste', 135.2238581884565)]

In [6]:
# Activity 5: Quantity of schools per TP_LOCALIZACAO and TP_DEPENDENCIA

# Filter to ensure valid data in TP_LOCALIZACAO and TP_DEPENDENCIA columns
atividade5RDD = rdd.filter(lambda x: x.split(";")[TP_LOCALIZACAO] and x.split(";")[TP_DEPENDENCIA])

# Map each row to pairs (TP_LOCALIZACAO, TP_DEPENDENCIA) with the value 1
atividade5Map = atividade5RDD.map(lambda x: ((x.split(";")[TP_LOCALIZACAO], x.split(";")[TP_DEPENDENCIA]), 1))

# Reduce to sum the values for each key to get the count of schools per location and dependency type
atividade5Reduce = atividade5Map.reduceByKey(lambda x, y: x + y)

# Save the result to a text file
atividade5Reduce.coalesce(1).saveAsTextFile("Atividade5.txt")

# Collect the result to view
atividade5Reduce.collect()

[(('1', '1'), 619),
 (('1', '2'), 26796),
 (('1', '3'), 64278),
 (('2', '4'), 934),
 (('2', '2'), 6768),
 (('2', '3'), 70091),
 (('1', '4'), 51559),
 (('2', '1'), 95)]

In [7]:
# Activity 6: Quantity of schools by SG_UF and TP_DEPENDENCIA, ordered by largest quantities.

# Filter to ensure valid data in SG_UF and TP_DEPENDENCIA columns
atividade6RDD = rdd.filter(lambda x: x.split(";")[SG_UF] and x.split(";")[TP_DEPENDENCIA])

# Map each row to pairs (SG_UF, TP_DEPENDENCIA) with the value 1
atividade6Map = atividade6RDD.map(lambda x: ((x.split(";")[SG_UF], x.split(";")[TP_DEPENDENCIA]), 1))

# Reduce to sum the values for each key to get the count of schools per state and dependency type
atividade6Reduce = atividade6Map.reduceByKey(lambda x, y: x + y)

# Sort the result by the count of schools in descending order
atividade6Sorted = atividade6Reduce.sortBy(lambda x: x[1], ascending=False)

# Save the result to a text file
atividade6Sorted.coalesce(1).saveAsTextFile("Atividade6.txt")

# Collect the result to view
atividade6Sorted.collect()

[(('BA', '3'), 15659),
 (('SP', '3'), 13822),
 (('SP', '4'), 13550),
 (('MG', '3'), 13363),
 (('MA', '3'), 11432),
 (('PA', '3'), 10371),
 (('CE', '3'), 7255),
 (('SP', '2'), 6572),
 (('RJ', '3'), 6368),
 (('MG', '4'), 6259),
 (('PE', '3'), 6184),
 (('RS', '3'), 5867),
 (('PR', '3'), 5487),
 (('RJ', '4'), 5301),
 (('PI', '3'), 5281),
 (('AM', '3'), 5081),
 (('MG', '2'), 4319),
 (('SC', '3'), 4290),
 (('PB', '3'), 3895),
 (('RN', '3'), 3435),
 (('BA', '4'), 3297),
 (('PE', '4'), 3216),
 (('RS', '4'), 3165),
 (('ES', '3'), 2752),
 (('GO', '3'), 2751),
 (('CE', '4'), 2604),
 (('RS', '2'), 2549),
 (('AL', '3'), 2474),
 (('PR', '4'), 2399),
 (('PR', '2'), 2164),
 (('MT', '3'), 1823),
 (('SC', '4'), 1608),
 (('SE', '3'), 1572),
 (('RJ', '2'), 1547),
 (('TO', '3'), 1534),
 (('MA', '4'), 1394),
 (('SC', '2'), 1371),
 (('GO', '4'), 1334),
 (('MA', '2'), 1305),
 (('BA', '2'), 1168),
 (('PE', '2'), 1083),
 (('PA', '4'), 1046),
 (('GO', '2'), 1040),
 (('PB', '4'), 1022),
 (('AC', '3'), 1009),
 (('