In [0]:
import json
import requests
from pyspark.sql.functions import *
from pyspark.sql.types import *

**Reading Mongo DB data**

In [0]:
uri = "mongodb+srv://<usuario>:<senha>@unicluster.ixhvw.mongodb.net/ibge.pnadc20203?retryWrites=true&w=majority"
 
ibge_mongoURL = spark.read.format("mongo")\
                  .option("uri", uri)\
                  .load()

In [0]:
ibge_mongoURL.printSchema()
# ibge_mongo.show(truncate=False)

**Defining the support functions that will parse the json object and transform it into a dataframe**

In [0]:
def convert_single_object_per_line(json_list):
    json_string = ""
    for line in json_list:
        json_string += json.dumps(line) + "\n"
    return json_string
 
def parse_dataframe(json_data):
    r = convert_single_object_per_line(json_data)
    mylist = []
    for line in r.splitlines():
        mylist.append(line)
    rdd = sc.parallelize(mylist)    
    df = spark.read.json(rdd)
    return df

**Creating the dataframe from the API**

In [0]:
url = 'https://servicodados.ibge.gov.br/api/v1/localidades/regioes/1|2|3|4|5/municipios'
r =  requests.get(url)
j = r.json()
ibge_apiURL = parse_dataframe(j)

In [0]:
ibge_mongoURL.write.parquet("/FileStore/ibge-mongo", mode="overwrite")
ibge_apiURL.write.parquet("/FileStore/ibge-api", mode="overwrite")

In [0]:
ibge_api = spark.read.parquet("/FileStore/ibge-api")

In [0]:
ibge_mongo = spark.read.parquet("/FileStore/ibge-mongo")\
                  .filter((col("sexo")=="Mulher")&(col("idade")>=20)&(col("idade")<=40))

In [0]:
ibge_api.printSchema()
#ibge_api.show(truncate=False)

In [0]:
ibge_mongo.printSchema()
#ibge_mongo.show(truncate=False)

#Questions to be answered after ingesting the data:

**What is the average income of the women between 20 and 40 years old in your database?**

In [0]:
display(ibge_mongo.agg({"renda": "avg"}).select(format_number('avg(renda)', 2).alias('Average income')).collect())


Average income
1809.28


**What is the average income of the women between 20 and 40 years in 'Distrito Federal'?**

In [0]:
display(ibge_mongo.filter("uf = 'Distrito Federal'").agg({"renda": "avg"}).select(format_number('avg(renda)', 2).alias('Average income in DF')).collect())

Average income in DF
2795.94


**What is the average income of women between 20 and 40 years old living in the "região sudeste" southeastern region of the country?**

In [0]:
RegiaoSudeste = ibge_api.select("microrregiao.mesorregiao.UF.nome").withColumnRenamed("nome","uf").filter(col("regiao-imediata.regiao-intermediaria.UF.regiao.sigla")=="SE").groupBy("uf").count()

display(ibge_mongo.join(RegiaoSudeste, ibge_mongo.uf == RegiaoSudeste.uf).groupBy().avg('renda').select(format_number('avg(renda)', 2).alias('Average income in Southeastern')).collect())


  


Average income in Southeastern
2034.06


**Which Brazilian state has the lowest average income with women between 20 and 40 years old?**

In [0]:
display(
  ibge_mongo.groupBy("uf").avg("renda").select("UF",format_number('avg(renda)', 2) \
  .alias('Average income in Southeastern')).orderBy("avg(renda)").head(1)
)

UF,Average income in Southeastern
Alagoas,1116.1


**Which Brazilian state has the highest average schooling with women between 20 and 40 years old?**

In [0]:
display(
  ibge_mongo.groupBy("uf").avg("anosesco").select("UF",format_number('avg(anosesco)', 2) \
  .alias('Highest average schooling')).orderBy(desc("avg(anosesco)")).head(1)
)

UF,Highest average schooling
Distrito Federal,13.11


**What is the average level of education among women between 20 and 40 years old in Mato Grosso?**

In [0]:
display(
  ibge_mongo.groupBy("uf").avg("anosesco").select("UF",format_number('avg(anosesco)', 2) \
  .alias('Average level of education in MT')).filter(ibge_mongo.uf == 'Mato Grosso')
)

UF,Average level of education in MT
Mato Grosso,12.02


**What is the average level of education among women who live in Paraná and are between 25 and 30 years old?**

In [0]:
display(
  ibge_mongo.filter("uf=='Paraná' and idade>=25 and idade<=30") \
  .groupBy("uf").avg("anosesco").select("UF",format_number('avg(anosesco)', 2).alias('Average level of education in PR')) 
)

UF,Average level of education in PR
Paraná,12.61


**What is the average income for people in the South of the country (Sul), who are in the workforce and are between 25 and 35 years old?**

In [0]:
RegiaoSul = ibge_api.select("microrregiao.mesorregiao.UF.nome").withColumnRenamed("nome","uf").filter(col("regiao-imediata.regiao-intermediaria.UF.regiao.sigla")=="S").groupBy("uf").count()

display(ibge_mongo.filter("idade>=25 and idade<=35 and trab=='Pessoas na força de trabalho'").join(RegiaoSul, ibge_mongo.uf == RegiaoSul.uf).groupBy().avg('renda').select(format_number('avg(renda)', 2).alias('Average income in South')).collect())

Average income in South
2231.89


**How many mesoregion are in the state of MG (Minas Gerais)?**

In [0]:
MesoMG = ibge_api.select("microrregiao.mesorregiao.nome") \
.filter(col("microrregiao.mesorregiao.UF.nome")=="Minas Gerais")\
.groupBy("nome").count().count() 

#MesoMG

MesoMG



**What is the average income of women residing in the North of Brazil, graduated, aged between 25 and 35 years and are black or brown?**

In [0]:
RegiaoNorte = ibge_api.select("microrregiao.mesorregiao.UF.nome").withColumnRenamed("nome","uf").filter(col("regiao-imediata.regiao-intermediaria.UF.regiao.sigla")=="N").groupBy("uf").count()

display(ibge_mongo.filter("idade>=25 and idade<=35 and (cor=='Preta' or cor=='Parda') and graduacao = 'Sim' and  trab = 'Pessoas na força de trabalho'").join(RegiaoNorte, ibge_mongo.uf == RegiaoNorte.uf).groupBy().avg('renda').select(format_number('avg(renda)', 2).alias('Average income in North')).collect())

Average income in North
1391.67
