# ETL com Apache Spark e MongoDB
## Grupo 13


Este notebook realiza o processo ETL utilizando Apache Spark e MongoDB. Para mais detalhes leia o [README.md](https://github.com/guilhermeomt/pmd2021-projeto-pratico-g13/blob/main/README.md) do projeto.

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import col, when 

spark = SparkSession \
    .builder \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/PMD2021.universities") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

## Extração de dados

In [11]:
df = spark.read.load("data/timesData.csv",
                     format="csv", sep=",", inferSchema="true", header="true")
df = df.select("world_rank", "university_name", "country", "total_score",  
"num_students", "international_students", "female_male_ratio", "year")
df = df.filter(df["total_score"] != "-")


## Transformação de dados

In [12]:
split_col = split(df['female_male_ratio'], ':')
df = df.withColumn('female_ratio', split_col.getItem(0) / 100)
df = df.withColumn('male_ratio', split_col.getItem(1) / 100)
df = df.drop('female_male_ratio')

In [13]:
auxDf = df.groupBy(df["university_name"]) \
          .agg({"total_score":"avg"}) \
          .withColumnRenamed("avg(total_score)", "avg_score")


In [14]:
auxDf = auxDf.withColumn("category", \
                                        when(col("avg_score") >= 91, "A") \
                                       .when(col("avg_score") >= 81, "B") \
                                       .when(col("avg_score") >= 71, "C") \
                                       .when(col("avg_score") >= 51, "D") \
                                       .otherwise("E"))

In [7]:
auxDf = auxDf.withColumnRenamed("university_name", "name")

df = df.join(auxDf, df.university_name == auxDf.name, 'inner') \
.select(df.world_rank, df.university_name, df.country, df.total_score, 
df.num_students, df.international_students, df.female_ratio, df.male_ratio, 
df.year, auxDf.avg_score, auxDf.category)

## Carregamento de dados

In [9]:
df.write.format("mongo").mode("append").save()