In [1]:
#import libraries
from selenium import webdriver
import requests
import re
from bs4 import BeautifulSoup
import json
import os
from multiprocessing import Pool
import numpy as np

In [8]:
#initialize constants

request_credentials = {
    "username": 'profesor',
    "password": 'profe2020',
    "profile": 'docente'
}

base_script = """
const dataP = {};
var dummy = "";

function getMaxValue(dataPoints) {
    if (Array.isArray(dataPoints[0].data)) {
        return Math.max(...dataPoints.map(dataset => Math.max(...dataset.data)));
    } else {
        return Math.max(...dataPoints);
    }
}

function designGraphics(idName, format, type, coordsX, dataPoints, stepSize, max, sigDif = null, dif = null, mDif =
        null) {
        const data = {
          format: format,
          coordsX: coordsX,
          dataPoints: dataPoints,
          sigDif: sigDif,
          dif: dif,
          mDif: mDif
        }

        dataP[idName] = data;
    }
"""

end_script = """
scrapFunction();

return dataP
"""

instructions_to_replace = [
    "document.getElementById('div-' + i + '-2').style.marginTop",
    "document.getElementById('dif-' + i + '-2').innerHTML",
    "container.innerHTML",
    "padreCanvas.innerHTML",
    "document.Fdocument('dif-' + i + '-2').innerHTML",
    "canvasContainer.parentNode"
  ]
replacement = "dummy"

pattern = r"\b(?:" + "|".join(map(re.escape, instructions_to_replace)) + r")\b"

opt = webdriver.FirefoxOptions()
opt.add_argument('--headless')
opt.add_argument('--no-sandbox')
opt.add_argument('--disable-dev-shm-usage')
n_workers = max(os.cpu_count() - 2, 2)

In [54]:
def authUrl(rbd):
    queryParams = 'auth?client_id=agencia&response_type=code&state=1'
    redirect = f'redirect_uri=http://www.simce.cl/validation/{rbd}'
    baseUrl = 'https://perfilador.agenciaeducacion.cl/auth/realms/Perfilador/protocol/openid-connect/'
    return f'{baseUrl}{queryParams}&{redirect}'

def set_script(script):
  index = script.find("document.addEventListener('DOMContentLoaded', function()")
  script = script[index:]\
    .replace("document.addEventListener('DOMContentLoaded', function()", "function scrapFunction()")\
    .replace("});\n\n    window.onload", "};\n\n    window.onload")
  end_index = script.find("window.onload")
  script = re.sub(pattern, replacement, script[:end_index])
  return base_script + "\n\n" + script + "\n\n" + end_script

def scrap(rbd):
  s = requests.Session()
  res = s.get(authUrl(rbd))
  log_url = BeautifulSoup(res.content).find('form')['action']
  s.post(log_url, data=request_credentials)
  landing = BeautifulSoup(s.get(f'https://www.simce.cl/{rbd}/inicio').content, 'html.parser')
  if landing.find_all('script')[-1].text.find("(parseInt('2023')") == -1:
    return -1

  indicator_page = BeautifulSoup(s.get(f'https://www.simce.cl/{rbd}/indicador').content, 'html.parser')
  dimension_page = BeautifulSoup(s.get(f'https://www.simce.cl/{rbd}/dimension').content, 'html.parser')

  school_type = indicator_page.find(id='dependencia').text
  municipality = indicator_page.find(id='comuna').text

  indicator_script = indicator_page.find_all('script')[-1].get_text()
  dimension_script = dimension_page.find_all('script')[-1].get_text()
  return {
    'indicator_script': indicator_script,
    'dimension_script': dimension_script,
    'school_type': school_type,
    'municipality': municipality,
  }

def process_schools(schools):
  no_data = []
  driver = webdriver.Firefox(options=opt)
  for school in schools:
    try:
      print(f'beggining {school["rbd"]}')
      scrap_data = scrap(str(school['rbd']))
      if scrap_data == -1:
        print(f'no data for {school["rbd"]}')
        no_data.append(str(school['rbd']))
        continue
      print(f'scraped {school["rbd"]}')
      indicators = driver.execute_script(set_script(scrap_data['indicator_script']))
      dimensions = driver.execute_script(set_script(scrap_data['dimension_script']))
      data = {
          'nombre_colegio': school['rbd_nombre'],
          'rbd': school['rbd'],
          'dependencia': scrap_data['school_type'],
          'comuna': scrap_data['municipality'],
          'GSE': scrap_data['GSE'],
          'indicadores': indicators,
          'dimensiones': dimensions,
      }
      with open(f"jsons/{data['rbd']}.json", "w", encoding='utf8') as output:
          json.dump(data, output, indent=2, ensure_ascii=False)
    except Exception as e:
      print(f"error {e} on {school['rbd']}")
  driver.quit()
  return no_data

def main():
  schools_file = open('establecimientos.json')
  schools = json.load(schools_file)
  schools_file.close()
  os.makedirs('jsons', exist_ok=True)
  files=os.listdir('jsons')
  sub_schools_list = list(filter(lambda x: f"{x['rbd']}.json" not in files, schools))
  if os.path.exists('sin_datos.txt'):
    file = open('sin_datos.txt', 'r')
    no_data = file.read().splitlines()
    sub_schools_list = list(filter(lambda x: x['rbd'] not in no_data, sub_schools_list))
  sub_schools_list = [sub_schools_list[i: i+n_workers] for i in range(0, len(sub_schools_list), n_workers)]
  with Pool(n_workers) as p:
    feedback = p.map(process_schools, sub_schools_list)
  no_data = np.concatenate(feedback).ravel().tolist() if feedback else []
  if len(no_data) > 0:
    with open("sin_datos.txt", "w") as output:
          output.write('\n'.join(no_data))

In [55]:
main()