<a href="https://colab.research.google.com/github/alexliqu09/Homologacion_Universidades/blob/main/Homologaci%C3%B3n_universidades.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark --no-cache-dir

In [None]:
import re
import os
import requests

import gdown
import pyspark

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, trim, \
                                  regexp_replace, collect_set,  collect_list, \
                                  struct, udf
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.types import BooleanType

In [None]:
#Descargar nuestro datos atravez de la función get_files que descargar tanto un archivo csv y json.
def get_files(input_string, output_name):
    is_url = re.match(r'^https?://', input_string) is not None
    if is_url:
        response = requests.get(input_string)
        with open(output_name, 'wb') as f:
            f.write(response.content)
    else:    
        url = f'https://drive.google.com/uc?id={input_string}'
        gdown.download(url, output_name, quiet=False)
    
get_files('1c8lsiC5LtkthwzZ3wmI7FUm1h_bbhrWU', 'instituciones_educativas.csv')
get_files(
        'https://krowdy.s3.us-east-1.amazonaws.com/ats/job/6434447e8e6c4c0008808420/opentest/2023-04-11T04-36-09-824ZUniversidades.json', 
        'universidades.json'
    )

In [None]:
#Se hace uso de rutas relativas 
csv_path = os.path.abspath("instituciones_educativas.csv")
json_path = os.path.abspath("universidades.json")

In [None]:
#se crea una sesión en spark
spark = SparkSession.builder.appName("Lectura de archivos").getOrCreate()

In [None]:
#leemos el archivo csv
df_csv = spark.read.format("csv") \
                   .option("header", "true") \
                   .load(csv_path)

df_csv = df_csv.withColumn("value", F.trim(df_csv["value"]))

df_csv.show(3, truncate=False)

In [None]:
#leemos el archivo json
schema = StructType([
    StructField("código INEI", StringType(), True),
    StructField("Nombre ", StringType(), True),
    StructField("Siglas ", StringType(), True)
])

df_json = spark.read.format("json") \
                    .option("multiline", "true") \
                    .schema(schema) \
                    .load(json_path)

df_json = df_json.withColumnRenamed("Siglas ", "Siglas")
df_json = df_json.withColumnRenamed("Nombre ", "Nombre")

df_json = df_json.withColumn("Siglas", F.trim(df_json["Siglas"]))
df_json = df_json.withColumn("Nombre", F.trim(df_json["Nombre"]))

df_json.show(4, truncate=False)

In [None]:
#Para la restricción me base en sig. criterio
# los usuarios por lo general escriben el nombre, las siglas
#las siglas con el nombre o una parte del nombre de la universdidad,
#Para normalizar los strgin se volvieron a minscula y se realiźó las
#comparaciones. Cabe mencionar para el criterio sobre palabras 
#como ceps,ciclo, instituto me base en la frequencia en la que
#aparecen estas palabras el código. No se esta mostrando pero 
#quiero resaltar que se tomo esta idea.
#sobre la última condición lo que se buscó ya que en su mayoria
#se tiene la palabra universidad se excluyó tal que 
#se aprovecharan las demás palabras ver si una frase como
#cesar vallejo se encuentre dentro la columna nombre del df_csv.

def check_similarity(text, nombre, siglas):
    text = str(text)
    text = re.sub(r'[^\w\s]', '', text).lower()
    nombre = nombre.lower()
    siglas = siglas.lower()   
    if any(x in text for x in ["ceps", "ingenieros", "idiomas", 
                               "inictel", "centro", "instituto",
                               "ciclo"]):
        return False

    if text == nombre or text == siglas:
        return True

    elif siglas in text.strip().split():
        return True
    
    elif 'universidad' in nombre:
        nombre_limpio = re.sub(r'universidad\s*', '', nombre).strip()
        if re.search(rf'\b{text}\b', nombre_limpio):
            return True

    return False

similarity_udf = udf(check_similarity, BooleanType())
#con este criterio se realizó un join para unir las tablas.
df_homologadas = df_csv.join(df_json, similarity_udf(df_csv.value, df_json.Nombre, df_json.Siglas))

In [None]:
df_homologadas = df_homologadas.withColumn("value", F.lower(trim(regexp_replace("value", "\s+", " ")))) \
                               .withColumn("Nombre", trim(regexp_replace("Nombre", "\s+", " ")))

In [None]:
df_homologadas.show(truncate=False)

In [None]:
#se guardó con el nombre solicitado. Cabe mencionar que se esta guardando
#el csv las columnas pedidas como value, candidateId y se renombre la columna nombre.
df_homologadas.select(col("candidateId"), col("value"), col("Nombre").alias("universidad homologada")) \
    .write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("universidades_homologadas.csv")

In [None]:
#se guardó el json con el nombre solicitado y se usó el collect_set sobre las columna de values
#ya que se busca no repertir dichos sinonimos.

df_sinonimos = df_homologadas.groupBy("Nombre") \
    .agg(collect_set("value").alias("sinonimos"))

df_sinonimos = df_sinonimos.select(col("Nombre").alias("nombre_universidad"), col("sinonimos"))

df_sinonimos.write.mode("overwrite").json("sinonimo_universidades.json")