In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean

# Start spark session

In [2]:
# Create a URL through you can access the Spark UI
#get_ipython().system_raw('./ngrok http 4050 &')

In [3]:
# Access the URL
#!curl -s http://localhost:4040/api/tunnels 

In [2]:
spark = SparkSession.builder \
    .appName("Collecting data - ENEM") \
    .master("local[*]") \
    .config("spark.jars", "./path/postgresql-42.7.2.jar") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

# Testing connection and visualizing schema

In [3]:
year = '2011'

url = "jdbc:postgresql://localhost:5433/ENEM_Data"

properties = {
    "user": "admin",
    "password": "*********",
    "driver": "org.postgresql.Driver"
}

In [71]:
test_connection = spark.read.jdbc(url, "\"Data_years\".\"2010\"", properties=properties)

In [5]:
df_raw = spark.read.jdbc(url, "\"Data_years\".\"" + year + "\"", properties=properties)
df_analysis = spark.read.jdbc(url, "\"Data_years\".\"Analise_" + year + "\"", properties=properties)

In [6]:
df_raw.printSchema()

root
 |-- NU_INSCRICAO: string (nullable = true)
 |-- NU_ANO: integer (nullable = true)
 |-- TP_FAIXA_ETARIA: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADO_CIVIL: integer (nullable = true)
 |-- TP_COR_RACA: integer (nullable = true)
 |-- TP_ST_CONCLUSAO: integer (nullable = true)
 |-- TP_ANO_CONCLUIU: integer (nullable = true)
 |-- TP_ESCOLA: integer (nullable = true)
 |-- TP_ENSINO: integer (nullable = true)
 |-- CO_MUNICIPIO_ESC: string (nullable = true)
 |-- NO_MUNICIPIO_ESC: string (nullable = true)
 |-- CO_UF_ESC: string (nullable = true)
 |-- SG_UF_ESC: string (nullable = true)
 |-- TP_DEPENDENCIA_ADM_ESC: integer (nullable = true)
 |-- TP_LOCALIZACAO_ESC: integer (nullable = true)
 |-- TP_SIT_FUNC_ESC: integer (nullable = true)
 |-- IN_CERTIFICADO: boolean (nullable = true)
 |-- NO_ENTIDADE_CERTIFICACAO: string (nullable = true)
 |-- CO_UF_ENTIDADE_CERTIFICACAO: string (nullable = true)
 |-- SG_UF_ENTIDADE_CERTIFICACAO: string (nullable = true)


In [7]:
df_analysis.printSchema()

root
 |-- Ano: integer (nullable = true)
 |-- Faixa_Etaria: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- NU_NOTA_CN: float (nullable = true)
 |-- NU_NOTA_CH: float (nullable = true)
 |-- NU_NOTA_LC: float (nullable = true)
 |-- NU_NOTA_MT: float (nullable = true)
 |-- NU_NOTA_REDACAO: float (nullable = true)
 |-- Nota_final: float (nullable = true)
 |-- Escolaridade_pai: string (nullable = true)
 |-- Escolaridade_mae: string (nullable = true)
 |-- Renda_familiar: string (nullable = true)
 |-- Trabalha_ou_ja: string (nullable = true)
 |-- Ajudar_despesas_casa: string (nullable = true)
 |-- Carga_trabalho_semanal: string (nullable = true)
 |-- Idade_comecou_trabalhar: integer (nullable = true)



#### But here spark df is in only one partition (standard spark reading for JDBC connection). Let's optimize and speed up the reading from JDBC connection to database, using partitions and parallelizing processes.

In [8]:
df_raw = None
df_analysis = None

#### In raw dataframe, going to use column 'CO_PROVA_CN' to make 8 partitions, for there are 8 possible integer values (between 89 and 108), as in the dictionary of the dataset, available to read in the project files. This is the best way to split data and create partitions, because this variable is chosen randomly, and in equal parts, by the test administrators.

In [11]:
# Else, you can find min and maximum values for partition
#df_min_max = spark.read.jdbc(
#    url=url,
#    table="SELECT Min(\"NU_NOTA_REDACAO\"),Max(\"NU_NOTA_REDACAO\") FROM \"Data_years\".\"" + year +"\"",
#    properties=properties,
#).collect()
#min_value, max_value = df_min_max[0][0], df_min_max[0][1]
#
#print(f'{min_value}, {max_value}')

# Understanding dataset

## Analyzing minimum and maximum scores, look for corrupted data

### Essay scores (expected 0 - 1000)

In [9]:
# Analyze min and maximum values
df_raw = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "(select \"CO_PROVA_CN\", \"NU_NOTA_REDACAO\" from \"Data_years\".\"" + year + "\")  as subq") \
    .option("numPartitions","8") \
    .option("partitionColumn", "\"CO_PROVA_CN\"") \
    .option("lowerBound", "89") \
    .option("upperBound", "108") \
    .option("user", "admin") \
    .option("password", "*********")

In [10]:
df_part = df_raw.load()

In [11]:
df_part.rdd.getNumPartitions()

8

In [12]:
df_part.createOrReplaceTempView('NOTA_REDACAO')

In [13]:
spark.sql("SELECT Min(NU_NOTA_REDACAO) ,Max(NU_NOTA_REDACAO) FROM NOTA_REDACAO").show()

+--------------------+--------------------+
|min(NU_NOTA_REDACAO)|max(NU_NOTA_REDACAO)|
+--------------------+--------------------+
|                 0.0|              1000.0|
+--------------------+--------------------+



Everything right here.

### Ciências da Natureza (expected something in the 0 - 1000 range)

Let's create a function to get partitioned data.

In [14]:
def load_data_partitioned(subject, year, url):
    # Analyze min and maximum values
    df = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "(select \"CO_PROVA_CN\", \""+ subject +"\" from \"Data_years\".\"" + year + "\")  as subq") \
    .option("numPartitions","8") \
    .option("partitionColumn", "\"CO_PROVA_CN\"") \
    .option("lowerBound", "89") \
    .option("upperBound", "108") \
    .option("user", "admin") \
    .option("password", "*********")
    df_part = df.load()
    return df_part

In [15]:
df_part = load_data_partitioned("NU_NOTA_CN", year, url)

In [16]:
df_part.createOrReplaceTempView('NOTA_CN')

In [17]:
spark.sql("SELECT Min(NU_NOTA_CN) ,Max(NU_NOTA_CN) FROM NOTA_CN").show()

+---------------+---------------+
|min(NU_NOTA_CN)|max(NU_NOTA_CN)|
+---------------+---------------+
|          265.0|          867.2|
+---------------+---------------+



Everything ok.

In [18]:
spark.sql("SELECT avg(NU_NOTA_CN) FROM NOTA_CN").show()

+-----------------+
|  avg(NU_NOTA_CN)|
+-----------------+
|486.5187887107547|
+-----------------+



### Ciências Humanas (expected something in the 0 - 1000 range)

In [19]:
df_part = load_data_partitioned("NU_NOTA_CH", year, url)

In [20]:
df_part.createOrReplaceTempView('NOTA_CH')
spark.sql("SELECT Min(NU_NOTA_CH) ,Max(NU_NOTA_CH) FROM NOTA_CH").show()

+---------------+---------------+
|min(NU_NOTA_CH)|max(NU_NOTA_CH)|
+---------------+---------------+
|          252.9|          793.1|
+---------------+---------------+



Everything right.

In [21]:
spark.sql("SELECT ROUND(avg(NU_NOTA_CH),2) FROM NOTA_CH").show()

+-----------------------------------------+
|round(avg(CAST(NU_NOTA_CH AS DOUBLE)), 2)|
+-----------------------------------------+
|                                   498.77|
+-----------------------------------------+



### Linguagens e Códigos (expected something in the 0 - 1000 range)

In [22]:
df_part = load_data_partitioned("NU_NOTA_LC", year, url)

In [23]:
df_part.createOrReplaceTempView('NOTA_LC')
spark.sql("SELECT Min(NU_NOTA_LC) ,Max(NU_NOTA_LC) FROM NOTA_LC").show()

+---------------+---------------+
|min(NU_NOTA_LC)|max(NU_NOTA_LC)|
+---------------+---------------+
|          301.2|          795.5|
+---------------+---------------+



All scores in range, no corrupted data in these columns.

In [24]:
spark.sql("SELECT ROUND(avg(NU_NOTA_LC),2) As Avg_LC FROM NOTA_LC").show()

+------+
|Avg_LC|
+------+
|541.18|
+------+



### Matemática (expected something in the 0 - 1000 range)

In [25]:
df_part = load_data_partitioned("NU_NOTA_MT", year, url)

In [26]:
df_part.createOrReplaceTempView('NOTA_MT')
spark.sql("SELECT Min(NU_NOTA_MT) ,Max(NU_NOTA_MT) FROM NOTA_MT").show()

+---------------+---------------+
|min(NU_NOTA_MT)|max(NU_NOTA_MT)|
+---------------+---------------+
|          321.6|          953.0|
+---------------+---------------+



In [27]:
spark.sql("SELECT ROUND(avg(NU_NOTA_MT),2) As Avg_MT FROM NOTA_MT").show()

+------+
|Avg_MT|
+------+
|543.14|
+------+



All scores from all subjects are within range, which means the data seems very clean and reliable.

## Analyzing NULL values and count according to absent candidates

In [4]:
# general function for queries using partition
def load_query(query, url, partitionColumn = "CO_PROVA_CN", numPartitions = "8", lowerbound = "89", upperbound = "108"):
    # Analyze min and maximum values
    df = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", query) \
    .option("numPartitions",numPartitions) \
    .option("partitionColumn", "\"" + partitionColumn + "\"") \
    .option("lowerBound", lowerbound) \
    .option("upperBound", upperbound) \
    .option("user", "admin") \
    .option("password", "*********")
    df_part = df.load()
    return df_part

In [5]:
df_raw = load_query("(select * from \"Data_years\".\"" + year + "\")  as subq", url)

Size of dataset

In [6]:
size = df_raw.count()

In [7]:
print(size)

631259


In [32]:
df_raw.describe('NU_NOTA_REDACAO').show()

+-------+-----------------+
|summary|  NU_NOTA_REDACAO|
+-------+-----------------+
|  count|           631259|
|   mean|433.0645627222008|
| stddev| 277.162355926838|
|    min|              0.0|
|    max|           1000.0|
+-------+-----------------+



There are no null values for essay.

Let's find how many missed the test for natural sciences and see if numbers match (0 - missed, 2 - eliminated)

In [33]:
df_raw.filter((col('TP_PRESENCA_CN') == 0) | (col('TP_PRESENCA_CN') == 2)).count()

132737

In [34]:
df_raw.filter("NU_NOTA_CN is NULL").count()

132737

So, there are only two null data for students who were present and atended to the test, not being eliminated.

All these analyzes until here indicates that data is very consistent.

So now we can continue to the statistical analyzes.

# Statistical analyzes and queries

## Average Total Score

Partition by column 'Faixa_etaria', values in range 1 - 20 according to data description.

In [8]:
df_analysis = load_query("(select * from \"Data_years\".\"Analise_" + year + "\")  as subq",
                          url,
                         "Faixa_Etaria",
                         "8",
                         "1",
                         "20"
                         )

In [36]:
df_analysis.select(mean(col('Nota_final'))).show()

+-----------------+
|  avg(Nota_final)|
+-----------------+
|526.6112369999335|
+-----------------+



## Average Essay Score

In [74]:
df_raw.filter(col('TP_STATUS_REDACAO') == 'P').select(mean(col('NU_NOTA_REDACAO'))).show()

+--------------------+
|avg(NU_NOTA_REDACAO)|
+--------------------+
|   573.8284760707835|
+--------------------+



## How many people registered for the exam

In [38]:
from IPython.display import Markdown as md
md(f"#### As obtained before, there were {size} candidates registered for the exam that year.")

#### As obtained before, there were 631259 candidates registered for the exam that year.

## How many people could be accepted in medicine in a regular university?

Medicine is the most competed and difficult course to enter in most universities in Brazil. Minimum score to enter medicine course in UFAM (Federal University of Amazonas) that year was 771,65, according to SISU (a regular minimum score for medicine, not so low, not so high).

In [39]:
medicine = df_analysis.filter(col('Nota_final') >= 771.65).count()

In [40]:
print(f"Only {medicine} students could enter a medicine course in a regular university that year, out of {size}.")

Only 673 students could enter a medicine course in a regular university that year, out of 631259.


In [41]:
# quantity of PRESENT students to do the exam
size_present = df_raw.filter('''TP_PRESENCA_CN = 1 and 
                 TP_PRESENCA_CH = 1 and 
                 TP_PRESENCA_LC = 1 and 
                 TP_PRESENCA_MT = 1 and 
                 TP_STATUS_REDACAO = \'P\'''').count()

In [42]:
size_present

475005

In [43]:
print("That correspond to {:.4f} % of candidates".format(medicine*100/size))

That correspond to 0.1066 % of candidates


In [44]:
print("And to {:.4f} % of present candidates at exam".format(medicine*100/size_present))

And to 0.1417 % of present candidates at exam


## How many people could be accepted in course with lowest accepting score?

Courses with lowest accepting score, on average, have passing score of around 400/1000. Considered this number to this study.

In [45]:
easiest_course = df_analysis.filter(col('Nota_final') >= 400).count()

In [46]:
print(f"{easiest_course} students could pass to the easiests courses that year.")

458434 students could pass to the easiests courses that year.


In [47]:
print("That correspond to {:.2f} % of candidates".format(easiest_course*100/size))

That correspond to 72.62 % of candidates


In [48]:
print("And to {:.4f} % of present candidates at exam".format(easiest_course*100/size_present))

And to 96.5114 % of present candidates at exam


Only a curiosity, highest score:

In [9]:
df_analysis.createOrReplaceTempView('df_analysis')

In [50]:
spark.sql("SELECT Max(Nota_final) FROM df_analysis").show()

+---------------+
|max(Nota_final)|
+---------------+
|         846.42|
+---------------+



## How many have a good knowledge of their native language (portuguese)?

This can be analyzed with competence no 1 score of essay, "Mastery of formal written Portuguese", in column 'NU_NOTA_COMP1'.

In [6]:
df_raw.createOrReplaceTempView('df_raw')

In [52]:
# This is for the other years, where competences are in range 0 - 200 instead of 0 - 1000
spark.sql("""select split.range as NU_NOTA_COMP1, count(*) as Count
 from (
  select case  
    when NU_NOTA_COMP1 between 0 and 40 then '0-40'
    when NU_NOTA_COMP1 between 40.1 and 80 then '40-80'
	   when NU_NOTA_COMP1 between 80.1 and 120 then '80-120'
	   when NU_NOTA_COMP1 between 120.1 and 160 then '120-160'
	   when NU_NOTA_COMP1 between 160.1 and 200 then '160-200'
    when NU_NOTA_COMP1 < 0 or NU_NOTA_COMP1 > 200 then 'out of range'
    when NU_NOTA_COMP1 is NULL then 'NULL'
	else '' end as range
  from df_raw
  where TP_STATUS_REDACAO = 'P') split
 group by split.range
 ORDER BY split.range """).show()

+-------------+------+
|NU_NOTA_COMP1| Count|
+-------------+------+
|         0-40|  2261|
|      120-160|189753|
|      160-200| 40091|
|        40-80| 37003|
|       80-120|207299|
+-------------+------+



## Income vs Score analysis

### Final/Total score

In [53]:
spark.sql('''SELECT ROUND(avg(Nota_final),2) As Avg_score_low_income
                FROM df_analysis 
             WHERE Renda_familiar = 'A' or Renda_familiar = 'B'
          '''
          ).show()

+--------------------+
|Avg_score_low_income|
+--------------------+
|              482.27|
+--------------------+



In [54]:
spark.sql('''SELECT ROUND(avg(Nota_final),2) As Avg_score_high_income
                FROM df_analysis 
             WHERE Renda_familiar != 'A' 
                     and Renda_familiar != 'B' 
                     and Renda_familiar != 'C'
                     and Renda_familiar != 'D'
                     and Renda_familiar != 'E'
          '''
          ).show()

+---------------------+
|Avg_score_high_income|
+---------------------+
|               601.12|
+---------------------+



### Essay Score

In [7]:
spark.sql('''SELECT ROUND(avg(NU_NOTA_REDACAO),2) As Avg_score_low_income
                FROM df_raw 
             WHERE (Q004 = 'A' or Q004 = 'B')
             and TP_STATUS_REDACAO = 'P'
          '''
          ).show()

+--------------------+
|Avg_score_low_income|
+--------------------+
|              533.56|
+--------------------+



In [8]:
spark.sql('''SELECT ROUND(avg(NU_NOTA_REDACAO),2) As Avg_score_high_income
                FROM df_raw
             WHERE Q004 != 'A' 
                     and Q004 != 'B' 
                     and Q004 != 'C'
                     and Q004 != 'D'
                     and Q004 != 'E'
                     and TP_STATUS_REDACAO = 'P'
          '''
          ).show()

+---------------------+
|Avg_score_high_income|
+---------------------+
|                643.9|
+---------------------+



### Knowledge of native language (portuguese)

In [7]:
spark.sql('''
    select count(*) As number_of_candidates
        from df_raw
    where NU_NOTA_COMP1 >= 120 and
      (Q004 = 'A' or Q004 = 'B')
''').show()

+--------------------+
|number_of_candidates|
+--------------------+
|               53518|
+--------------------+



In [1]:
print("That correspond to only {:.2f} % of candidates".format(53518*100/size))

That correspond to only 8.48 % of candidates


Only 53 thousand candidates with low income achieved 'reasonably good' score for portuguese knowledge.

In [59]:
spark.sql('''
    select count(*) As number_of_candidates
        from df_raw
    where NU_NOTA_COMP1 >= 120 and
          Q004 != 'A' and Q004 != 'B'
''').show()

+--------------------+
|number_of_candidates|
+--------------------+
|              317562|
+--------------------+



In [60]:
print("While {:.2f} % of present candidates at exam that had 'reasonably good' score for portuguese were not low income".format(317562*100/size_present))

While 66.85 % of present candidates at exam that had 'reasonably good' score for portuguese were not low income


### How many low income candidates could be accepted in medicine?

In [61]:
medicine_income = df_analysis.filter('''Nota_final >= 771.65 and 
                                (Renda_familiar = 'A' or Renda_familiar = 'B')''').count()

In [62]:
print(f"Only {medicine_income} low income students achieved score to enter a medicine course in a regular university that year.")

Only 12 low income students achieved score to enter a medicine course in a regular university that year.


## Sex vs Score analyzis

### Final/Total Score

In [63]:
spark.sql('''
select ROUND(avg(Nota_final),2) As avg_final_score, Sexo
from df_analysis
group by Sexo
'''
).show()

+---------------+----+
|avg_final_score|Sexo|
+---------------+----+
|         521.32|   F|
|         534.67|   M|
+---------------+----+



### Essay score

In [7]:
spark.sql('''
select ROUND(avg(NU_NOTA_REDACAO),2) As avg_essay_score, TP_SEXO as Sex
from df_raw
where TP_STATUS_REDACAO = 'P'
      or TP_STATUS_REDACAO != 'F'
group by TP_SEXO
'''
).show()

+---------------+---+
|avg_essay_score|Sex|
+---------------+---+
|          573.1|  F|
|         540.57|  M|
+---------------+---+



### How many have already worked or works?

In [12]:
spark.sql('''
select count(Trabalha_ou_ja) as had_or_have_job_responsabilities, Sexo as Sex
from df_analysis
where Trabalha_ou_ja = 'S'
group by Sexo
'''
).show()

+--------------------------------+---+
|had_or_have_job_responsabilities|Sex|
+--------------------------------+---+
|                          186133|  F|
|                          151513|  M|
+--------------------------------+---+



### Now, among high income students who have never worked, what is the sex gap in score?

In [66]:
spark.sql('''
select ROUND(avg(Nota_final),2) as Total_score, Sexo as Sex
from df_analysis
    WHERE Renda_familiar != 'A' 
            and Renda_familiar != 'B' 
            and Renda_familiar != 'C'
            and Renda_familiar != 'D'
            and Renda_familiar != 'E'
            and Trabalha_ou_ja = 'N'
group by Sexo
'''
).show()

+-----------+---+
|Total_score|Sex|
+-----------+---+
|     607.16|  F|
|      614.3|  M|
+-----------+---+



We can see that the gap between female and male score is musch smaller.

In [13]:
spark.stop()