In [1]:
from collections import defaultdict
from dotenv import load_dotenv
from github import Github
from io import StringIO
from itertools import zip_longest
from nbclient import NotebookClient
from pathlib import Path
import contextlib
import difflib
import json
import nbformat
import os
import pandas as pd
import pprint as pp
import re
import shutil
import time


### Funciones utiles para manejar archivos y directorios

In [2]:
def raiz():
    return Path.cwd().parent

def ubicar(comision, *camino):
    ''' Ubica una carpeta en la comisión indicada '''
    carpeta = f'lab4-C{comision}' if comision != 0 else 'lab4'  # Si la comisión es 0, se asume que es la cátedra
    comision_path = raiz() / carpeta
    salida = comision_path.joinpath(*camino)

    return salida

def carpeta_alumno(alumno):
    ''' Genera el nombre de la carpeta del alumno '''
    return f"{alumno['orden']:02} - {alumno['legajo']} - {alumno['apellido']}, {alumno['nombre']}".strip()

def listar_carpetas(origen):
    return sorted([c for c in Path(origen).iterdir() if c.is_dir() and not c.name.startswith('.')])

def ubicar_alumno(legajo, *camino, alumnos):
    ''' Dado el legajo de un alumno, devuelve la carpeta donde debería estar '''

    alumno = alumnos.loc[alumnos['legajo'] == str(legajo)].iloc[0]
    comision = int(alumno['comision'])
    return ubicar(comision, 'practicos', carpeta_alumno(alumno), *camino)

def copiar_archivos(origen, destino, forzar=False):
    ''' Copia los archivos de una carpeta a otra '''
    origen_path = Path(origen)
    destino_path = Path(destino)

    # Si la carpeta de destino ya existe y se debe forzar la copia, eliminarla primero
    if destino_path.exists() and forzar:
        shutil.rmtree(destino_path)

    # Copiar todo el contenido de la carpeta de origen a la carpeta de destino
    shutil.copytree(origen_path, destino_path, dirs_exist_ok=True)

def renombrar_carpeta(origen, destino):
    ''' Renombra una carpeta '''
    origen_path = Path(origen)
    destino_path = Path(destino)

    # Renombrar la carpeta
    origen_path.rename(destino_path)

@contextlib.contextmanager
def cambiar_carpeta(destino):
    """Context manager for changing the current working directory"""
    base = os.getcwd()
    os.chdir(destino)
    try:
        yield
    finally:
        os.chdir(base)
# print(f'Ubicar: {ubicar(0,'enunciados', 'tp5', 'ejercicio1')}')
# print(f'Ubicar: {ubicar(1,'practicos', '01 - 12345 - Apellido, Nombre', 'tp5', 'ejercicio1')}')

### Cargar alumnos

Toma los alumnos que estan en alumnos.md y lo carga en un datatable.

El formato del archivo debe ser
```
## Comision N
1. Legajo Apellido, Nombre
```

In [None]:
def cargar_alumnos(origen='alumnos.md'):
    ''' Carga los alumnos desde el archivo de texto '''
    
    origen_path = Path(origen)
    
    with origen_path.open('r') as file:
        lineas = file.readlines()

    # Inicializar listas para almacenar los datos
    comisiones, ordenes, legajos, apellidos, nombres = [], [], [], [], []
    comision = None
    
    for linea in lineas:
        linea = re.sub(r'\|.*|^\s*|\s*$', '', linea)  # Limpiar la línea
        linea = linea[:50]
        if linea.startswith('## Comisión'):  # Extraer el número de la comisión
            comision = int(linea.split(' ')[-1])
        elif linea and linea[0].isdigit():  # Extraer el número de orden, legajo, apellido y nombre
            linea = linea[0:50]
            match = re.match(r'(\d+)\.\s+(\d+)\s+([^,]+),\s+(.+)', linea)
            if match:
                comisiones.append(comision)
                ordenes.append(int(match.group(1)))
                legajos.append(match.group(2))
                apellidos.append(match.group(3))
                nombres.append(match.group(4))
    
    # Crear el DataFrame
    data = {
        'comision': comisiones,
        'orden':    ordenes,
        'legajo':   legajos,
        'apellido': apellidos,
        'nombre':   nombres
    }
    return pd.DataFrame(data)

def marcas(asistencias, legajo):
    asistio = ''
    if asistencias and legajo in asistencias:
        for _, resultado in asistencias[legajo].items():
            if resultado['ausente']:
                asistio += '🔴'
            elif resultado['revisar']:
                asistio += '🟡'
            else:
                asistio += '🟢'
    return asistio

def guardar_alumnos(alumnos, destino='alumnos.md', asistencias=None):
    salida = StringIO()
    salida.write('# Alumnos Lab4.2024\n')

    for comision in alumnos['comision'].unique():
        salida.write("\n")
        salida.write(f'## Comisión {comision}\n')
        salida.write("```\n")
        for _, alumno in alumnos[alumnos['comision'] == comision].iterrows():
            asistio = marcas(asistencias, alumno['legajo'])
            salida.write(f"{alumno['orden']:2}. {alumno['legajo']:5}  {f'{alumno['apellido']}, {alumno['nombre']}'.strip():40} {asistio}\n")
        salida.write("```\n")
        
    destino_path = Path(destino)
    with destino_path.open('w') as file:
        file.write(salida.getvalue())

def guardar_resumen(alumnos, destino='resumen.md', asistencias=None):
    salida = StringIO()
    salida.write('# Resumen Lab4.2024\n')

    for comision in alumnos['comision'].unique():
        salida.write("\n")
        salida.write(f'## Comisión {comision}\n')
        salida.write("```\n")
        for _, alumno in alumnos[alumnos['comision'] == comision].iterrows():
            asistio = marcas(asistencias, alumno['legajo'])
            if asistio.endswith('🟡') or asistio.endswith('🔴') and asistio.startswith('🟢'):
                salida.write(f" {asistio} {alumno['orden']:2}. {alumno['legajo']:5}  {f'{alumno['apellido']}, {alumno['nombre']}'.strip():40}\n")
        salida.write("```\n")
        
    destino_path = Path(destino)
    with destino_path.open('w') as file:
        file.write(salida.getvalue())


# print(cargar_alumnos())
a = cargar_alumnos()
# a[a['orden'] < 2]


### Detectar datos duplicados

Revisa si los legajos estan duplicados (error), y como curiosidad si hay nombre y apellidos repetidos.

In [4]:
def detectar_duplicados(data):
    def duplicados(columna):
        return data[data[columna].duplicated(keep=False)].sort_values(by=columna)
    
    def reportar_duplicados(columna):
        duplicados_columna = duplicados(columna)
        num_duplicados = len(duplicados_columna[columna].unique())
        if num_duplicados:
            print(f"\nHay {num_duplicados} {columna.title()} duplicados en {len(duplicados_columna)} registros")
            print(duplicados_columna)
        else:
            print(f"\nNo hay {columna.title()} duplicados")
    
    # Reportar registros con legajo duplicado
    print("Registros con legajo duplicado:")
    reportar_duplicados('legajo')
    
    # Reportar registros con nombre duplicado
    # reportar_duplicados('nombre')
    
    # Reportar registros con apellido duplicado
    # reportar_duplicados('apellido')

detectar_duplicados(cargar_alumnos())

Registros con legajo duplicado:

No hay Legajo duplicados


### Generar trabajo prácticos
Copia de la carpeta prácticos el tp a cada una de los repositorios 


In [5]:
def publicar(practico):
    alumnos = cargar_alumnos()

    # Copia los archivos del práctico a cada alumno
    for legajo in alumnos['legajo']:
        destino = f"{ubicar_alumno(legajo, alumnos=alumnos)}/{practico}" 
        copiar_archivos(f'./practicos/{practico}', destino)
        
    # Copia el enunciados del práctico a cada comisión
    for comision in [2,5,7,9]:
        destino = ubicar(comision)
        shutil.copy(f'./practicos/enunciados/{practico}.md', f"{destino}/README.md")

publicar('tp6')

### Normalizar nombres de carpetas.
Esta función permite revisar que los nombres estén bien escritos (tomando en cuenta el número de legajo).

In [6]:
def extraer_legajo(carpeta):
    ''' Extrae el legajo del nombre de la carpeta '''
    m = re.match(r'.*(\d{5}).*', str(carpeta))
    return m.group(1) if m else None

def normalizar(*comisiones):
    alumnos = cargar_alumnos()
    if not comisiones: comisiones = [2, 5, 7, 9]

    for comision in comisiones:
        # listar todas las carpetas de alumnos que esten en la carpeta de la comisión

        carpetas = listar_carpetas(ubicar(comision, 'practicos'))
        primer = True 
        for alumno in carpetas:
            legajo = extraer_legajo(str(alumno))
            if not legajo: continue

            origen  = ubicar(comision, 'practicos', alumno)
            destino = ubicar_alumno(legajo, alumnos=alumnos)

            if str(origen) == str(destino): continue

            if primer:
                print(f"> Comisión {comision} - {len(carpetas)} alumnos")
                primer = False
            print(f" - [{origen}]\n   [{destino}]\n {origen == destino}")
            
            renombrar_carpeta(origen, destino)
            
normalizar()

### Funciones para comparar dos Jupiter Notebook 

In [7]:
def cargar_cuaderno(origen):
    '''Dado un archivo .ipynb, devuelve una lista con las líneas de código 
       de cada celda de código en el cuaderno.'''
    
    def limpiar(lineas): # Elimina líneas en blanco y comentarios
        return [linea.strip() for linea in lineas if linea.strip() and not linea.strip().startswith('#')]
    
    def codigo(cuaderno): # Extrae las líneas de código de las celdas de código
        celdas = cuaderno.get('cells', [])
        return [limpiar(celda.get('source', [])) for celda in celdas if celda.get('cell_type') == 'code']
    
    origen_path = Path(origen)
    with origen_path.open('r', encoding='utf-8') as f:
        cuaderno = json.load(f)
 
    return codigo(cuaderno)


def comparar_cuadernos(original, modificado):
    '''Dados dos cuadernos de Jupyter, devuelve la cantidad 
       de líneas de código que se agregaron en el segundo 
       respecto del primero.'''

    def igualar(a, b): 
        while len(a) < len(b): 
            a.append([])
        while len(b) < len(a): 
            b.append([])
        return zip(a, b)
    
    def cambios(a, b): 
        diferencias = difflib.unified_diff(a, b, lineterm='')
        diferencias = [l for l in diferencias if l.startswith('+') and not l.startswith('+++')]
        return len(diferencias)
    
    conteo = [cambios(o, m) for o, m in igualar(original, modificado)]
    
    # def cambios(origen, modificado):
    #     diferencias = difflib.unified_diff(origen, modificado, lineterm='')
    #     return sum(1 for l in diferencias if re.match(r'^\+[^+]', l))
    
    # conteo = [cambios(o, m) for o, m in zip_longest(original, modificado, fillvalue=[])]

    return {'diferencias': conteo, 'total': sum(conteo)}


### Verificar ejecución de notebook
Ejecuta el notebook y cuenta cuantas veces aparece un texto especifico en los resultados.

In [8]:
def verificar_ejecucion(origen, texto='Prueba pasada exitosamente'):

    camino = Path(origen)
    carpeta = camino.parent
    nombre  = camino.name

    try:
        with cambiar_carpeta(carpeta):
            # Cargar el notebook
            with open(nombre, 'r', encoding='utf-8') as f:
                nb = nbformat.read(f, as_version=4)

            # Crear un cliente para ejecutar el notebook
            client = NotebookClient(nb, timeout=600, kernel_name='python3')
            client.execute()

            # Contar las ocurrencias del texto en las salidas de las celdas de código
            contador = sum(
                output.text.count(texto)
                for celda in nb.cells if celda.cell_type == 'code' and 'outputs' in celda
                for output in celda.outputs if output.output_type == 'stream' and 'text' in output
            )
            return contador
    except:
        return 0


### Automatizar merge de repositorios
Revisa que pull requests están abiertos y los mergea.

Luego de mergearlos trae los cambios a la rama local.

In [9]:
load_dotenv("config.env")
token = os.getenv("GITHUB_TOKEN")
g = Github(token)

def esperar_merge(segundos=4):
    for _ in range(segundos):
        time.sleep(1)

def comisiones_pendientes(*comisiones):
    if not comisiones: comisiones = [2, 5, 7, 9]
    salida = []
    for comision in comisiones:
        repo = g.get_repo(f"AlejandroDiBattista/lab4-C{comision}")

        pulls = repo.get_pulls(state='open', sort='created', base='main')
        if pulls.totalCount == 0:
            print(f"- C{comision}: no hay PRs pendientes. 🟢")
        else:
            salida.append(comision)
            print(f"- C{comision}: hay {pulls.totalCount:2} PRs pendientes. 🟡")
    print()
    return salida


def aceptar_pr(*comisiones):
    if not comisiones: comisiones = [2, 5, 7, 9]
    print(f"Comisiones a procesar: {comisiones}\n")
    resultados = []

    print("> Verificando PRs pendientes:")
   
    comisiones = comisiones_pendientes(*comisiones)
    for comision in comisiones:
        repo = g.get_repo(f"AlejandroDiBattista/lab4-C{comision}")

        pulls = repo.get_pulls(state='open', sort='created', base='main')
        print(f"> Comisión {comision}, {pulls.totalCount:2} PRs pendientes")

        fallas = 0
        for i, pr in enumerate(pulls):
            texto = f" {i+1:3}) PR #{pr.number:4}: {pr.title:50} de {pr.user.login:25}"
            
            pull_request = repo.get_pull(pr.number)
            esperar_merge()
            pull_request = repo.get_pull(pr.number)  # Volver a obtener el PR para actualizar el estado

            # Verificar si el PR está mergeable (si no tiene conflictos)
            if pull_request.mergeable:
                # Hacer merge del PR
                pull_request.merge()
                print(f"{texto} 🟢 Ok.")
            else:
                print(f"{texto} 🔴 :( Hay conflictos.")
                fallas += 1
        if fallas:
            print(f"~ {fallas} PRs no pudieron ser aceptados.")
            resultados.append((comision, fallas))
        else:
            print(f"~ Todos los PRs fueron aceptados.")
            resultados.append((comision, 0))
        print()
        try:
            origen = repo.remotes.origin 
            origen.fetch()
            print(f" >> Fetch realizado con éxito")
        except:
            pass

    print("\n> Resultados:")
    for comision, fallas in resultados:
        print(f"- C{comision}: {'🟢 Ok' if fallas==0 else f'🔴 {fallas}'}")

# comisiones_pendientes()
# aceptar_pr(7)

## Analiza los trabajos presentados

Verifica que los trabajos prácticos estén bien presentados

Revisa cuántas líneas de código han cambiado en cada notebook.

En el caso de los parciales, revisa que todos los tests hayan sido superados.

In [None]:
def listar_cuadernos(origen):
    '''Dada una carpeta, devuelve una lista con los cuadernos de Jupyter (.ipynb) que contiene.'''
    return [archivo for archivo in Path(origen).iterdir() if archivo.suffix == '.ipynb']

def analizar_cambios(practicos, comisiones=[2, 5, 7, 9], parciales=['tp5']):
    resultados = defaultdict(dict)
    if not isinstance(comisiones, list): comisiones = [comisiones]
    if not isinstance(practicos, list):  practicos  = [practicos]

    # normalizar() # Asegurarse de que las carpetas de los alumnos estén normalizadas

    for practico in practicos:

        archivos = listar_cuadernos(ubicar(0, 'practicos', practico))
        for archivo in archivos:
            archivo = Path(archivo).name
            ruta_original = ubicar(0, 'practicos', practico, archivo)

            print(f"Analizando {ruta_original}:")
            original = cargar_cuaderno(str(ruta_original))
            celdas   = len(original)

            for comision in comisiones:
                carpetas = listar_carpetas(ubicar(comision, 'practicos'))
                # print(f"Comisión {comision}: {len(carpetas)} alumnos")

                for alumno in carpetas:
                    alumno = str(Path(alumno).name)
                    # print(f'Alumno:  {alumno}')

                    destino = ubicar(comision, 'practicos', alumno, practico, archivo)
                    # print(f'Comision: {comision}\n>>{ruta_original}\n=={destino}')
                    # print(f"{comision=} {alumno=} {practico=} {archivo=}")
                    nuevo   = cargar_cuaderno(str(destino))
                    cambios = comparar_cuadernos(original, nuevo)
                    
                    # print(f"    {cambios['total']:3} líneas agregadas")
                    legajo = extraer_legajo(alumno)
                    cambios['celdas']  = celdas
                    cambios['ausente'] = cambios['total'] < 5
                    print(f"    {legajo} {cambios['total']:3} líneas agregadas {practico} {practico in parciales}")
                    if practico in parciales:
                        print(f"    Verificando ejecución... {destino}")
                        cambios['revisar'] = verificar_ejecucion(destino) == 0
                    else:
                        cambios['revisar'] = False
                    resultados[legajo][practico] = cambios

    return resultados

if False:
    alumnos = cargar_alumnos()
    r = analizar_cambios(['tp1','tp2','tp3', 'tp4', 'tp5'], [2, 5, 7, 9], ['tp5'])

    guardar_alumnos(alumnos, asistencias=r)
    guardar_resumen(alumnos, asistencias=r)