In [0]:
%sql
create or replace table  codigo.default.hired_employees
(
  id int,
  name String,
  datetime String,
  deparment_id int,
  job_id int
)
USING DELTA;


create or replace table  codigo.default.jobs
(
 id int ,
 job string  
)
USING DELTA;


create or replace table  codigo.default.departments
(
 id int ,
 department string  
)
USING DELTA;

In [0]:
spark.conf.set("fs.azure.account.key.globcontenedor.dfs.core.windows.net", "secret")


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import  col, date_format, to_timestamp, quarter, count, mean
from flask import Flask
 
def insertar_registros_final(tabla_final_df, nombre_tabla_final_df):
    columnas = []
 
    for columna in spark.table(nombre_tabla_final_df).schema:
        columnas.append(col(columna.name).cast(columna.dataType).alias(columna.name))
 
    historia_df = tabla_final_df.select(columnas)
 
    historia_df.write.format("delta").mode("overwrite").option(
        "partitionOverwriteMode", "dynamic"
    ).insertInto(nombre_tabla_final_df)

def dataframe_to_json(df):
    return [row.asDict() for row in df.collect()]

spark = SparkSession.builder.appName("API").getOrCreate()

app = Flask(__name__)


def API():
    schema_job = StructType([
        StructField("id", IntegerType(), True),
        StructField("job", StringType(), True)
    ])
 
    schema_hired_employees = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("datetime", StringType(), True),
        StructField("deparment_id", IntegerType(), True),
        StructField("job_id", IntegerType(), True)
    ])
   
    schema_departments = StructType([
        StructField("id", IntegerType(), True),
        StructField("department", StringType(), True)
    ])
   
 
    file_path_jobs="abfss://csv@globcontenedor.dfs.core.windows.net/jobs"
    file_path_hired_employees="abfss://csv@globcontenedor.dfs.core.windows.net/hired_employees"
    file_path_departments="abfss://csv@globcontenedor.dfs.core.windows.net/departments"
    df_jobs=spark.read.csv(file_path_jobs, header=False, sep=',',schema=schema_job)
    df_hired_employees=spark.read.csv(file_path_hired_employees, header=False, sep=',',schema=schema_hired_employees)
    df_hired_departments=spark.read.csv(file_path_departments, header=False, sep=',',schema=schema_departments)
 
    tabla_jobs = 'codigo.default.jobs'
    tabla_employees = 'codigo.default.hired_employees'
    tabla_departments = 'codigo.default.departments'
 
    insertar_registros_final(df_jobs,tabla_jobs)
    insertar_registros_final(df_hired_employees,tabla_employees)
    insertar_registros_final(df_hired_departments,tabla_departments)
    
@app.route('/api/metrics/employees_hired', methods=['GET'])
def Con1():
    tabla_jobs = 'codigo.default.jobs'
    tabla_employees = 'codigo.default.hired_employees'
    tabla_departments = 'codigo.default.departments'
 
    df_job = spark.table(tabla_jobs)
    df_employees = spark.table(tabla_employees)
    df_departments = spark.table(tabla_departments)
 
    df_result = (
        df_employees.alias('A').join(
            df_job.alias('B'), col('A.job_id') == col('B.id'),'left'
        ).join(
            df_departments.alias('C'), col('A.deparment_id') == col('C.id'), 'left'
        ).filter(
        date_format(to_timestamp(col('A.datetime'), "yyyy-MM-dd'T'HH:mm:ss'Z'"),'yyyy') == 2021
        ).select(
            col('C.department'),
            col('B.job'),
            quarter(to_timestamp(col('A.datetime'), "yyyy-MM-dd'T'HH:mm:ss'Z'")).alias('quarter')
        ).groupBy(
            col('department'),
            col('job')
        ).pivot(
            'quarter'
        ).agg(
            count('*')
        ).orderBy(
            col('department'),
            col('job')
        )
    )
    # result = dataframe_to_json(df_result)
    return df_result

@app.route('/api/metrics/top_hired_departments_2021', methods=['GET'])
def Con2():
    tabla_jobs = 'codigo.default.jobs'
    tabla_employees = 'codigo.default.hired_employees'
    tabla_departments = 'codigo.default.departments'
 
    df_job = spark.table(tabla_jobs)
    df_employees = spark.table(tabla_employees)
    df_departments = spark.table(tabla_departments)
 
    promedio = df_employees.alias('A').join(
                df_job.alias('B'), col('A.job_id') == col('B.id'),'left'
            ).join(
                df_departments.alias('C'), col('A.deparment_id') == col('C.id'), 'left'
            ).filter(
            date_format(to_timestamp(col('A.datetime'), "yyyy-MM-dd'T'HH:mm:ss'Z'"),'yyyy') == 2021
            ).groupBy(
                col('C.id'),
                col('C.department')
            ).agg(
                count('*').alias('count')
            ).select(mean(col('count'))).first()[0]
 
 
    df_result  =  df_employees.alias('A').join(
                df_job.alias('B'), col('A.job_id') == col('B.id'),'left'
            ).join(
                df_departments.alias('C'), col('A.deparment_id') == col('C.id'), 'left'
            ).groupBy(
                col('C.id'),
                col('C.department')
            ).agg(
                count('*').alias('hired')
            ).orderBy(
                col('hired').desc()
            ).filter(
                col('hired') >promedio
            )
    return df_result

In [0]:
display(Con1())

department,job,1,2,3,4
,Administrative Assistant I,,,1.0,
,Analog Circuit Design manager,,,,1.0
,Assistant Media Planner,,,,1.0
,Database Administrator III,,,,1.0
,Dental Hygienist,,1.0,,
,Junior Executive,,1.0,,
,Nurse,,,1.0,
,Occupational Therapist,1.0,,,
,Project Manager,,,1.0,
,Research Assistant II,,1.0,,


In [0]:
display(Con2())

id,department,hired
8,Support,256
6,Human Resources,249
5,Engineering,245
7,Services,240
4,Business Development,222
3,Research and Development,178
9,Marketing,166
10,Training,141


In [0]:
import pytest
# from app import app
import json

@pytest.fixture
def client():
    app.config['TESTING'] = True
    client = app.test_client()
    return client

def test_api_upload(client):
    response = client.post('/api/upload')
    assert response.status_code == 200  # Asegurar que responde correctamente

def test_hired_employees_2021(client):
    response = client.get('/api/metrics/hired_employees_2021')
    assert response.status_code == 200  # Verificar código de respuesta
    data = json.loads(response.data)
    assert isinstance(data, list)  # Verificar que el resultado es una lista

def test_hired_employees_above_average(client):
    response = client.get('/api/metrics/hired_employees_2021')
    assert response.status_code == 200
    data = json.loads(response.data)
    assert isinstance(data, list)  # Verificar que la respuesta sea una lista

def test_invalid_endpoint(client):
    response = client.get('/api/invalid_endpoint')
    assert response.status_code == 404  # Verificar que un endpoint inválido devuelve 404

def test_api_upload_method_not_allowed(client):
    response = client.get('/api/upload')
    assert response.status_code == 405  # Verificar que GET no está permitido en /api/upload

def test_hired_employees_2021_content(client):
    response = client.get('/api/metrics/hired_employees_2021')
    assert response.status_code == 200
    data = json.loads(response.data)
    if data:
        assert 'department' in data[0]  # Verificar que la respuesta contiene la clave esperada
        assert 'job' in data[0]

def test_hired_employees_above_average_empty(client):
    response = client.get('/api/metrics/hired_employees_2021')
    assert response.status_code == 200
    data = json.loads(response.data)
    assert isinstance(data, list)
    assert len(data) >= 0  # Verificar que la lista puede ser vacía pero no nula
