In [None]:
#from functions import get_file_name, impute_with_most_common, handle_missing_categorical_columns, impute_abs_negative_values, string_indexer_transform, inner_join, nombre_columnas, vector_assembler, scaler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import Row

In [None]:
# Define la prueba para get_file_name
def test_get_file_name():
    file_path = "/ruta/al/archivo.csv"
    result = get_file_name(file_path)
    expected = "archivo"
    assert result == expected

In [None]:
# Define la prueba para impute_with_most_common
def test_impute_with_most_common(spark_session):
    # Crea un DataFrame de prueba con valores nulos
    data = [("A", 1), ("B", None), ("C", 3), ("A", 4), ("B", None), ("C", 6)]
    columns = ["common_column", "value"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función impute_with_most_common con el DataFrame de prueba
    result_df = impute_with_most_common(mock_df, "value")

    # Utiliza afirmaciones (assert) para verificar que los valores nulos han sido reemplazados correctamente
    expected_data = [("A", 1), ("B", 1), ("C", 3), ("A", 4), ("B", 1), ("C", 6)]
    expected_columns = ["common_column", "value"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()

In [None]:
# Define la prueba para handle_missing_categorical_columns
def test_handle_missing_categorical_columns(spark_session):
    # Crea un DataFrame de prueba con valores nulos en columnas categóricas
    data = [("A", "X", 1), ("B", None, 2), ("C", "Z", 3), ("A", None, 4), ("B", "X", 5), ("C", "Z", 6)]
    columns = ["col1", "col2", "value"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función handle_missing_categorical_columns con el DataFrame de prueba
    result_df = handle_missing_categorical_columns(mock_df, ["col2"])

    # Utiliza afirmaciones (assert) para verificar que los valores nulos han sido reemplazados correctamente
    expected_data = [("A", "X", 1), ("B", "Unknown", 2), ("C", "Z", 3), ("A", "Unknown", 4), ("B", "X", 5), ("C", "Z", 6)]
    expected_columns = ["col1", "col2", "value"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()

In [None]:
# Define la prueba para handle_missing_categorical_columns
def test_handle_missing_categorical_columns(spark_session):
    # Crea un DataFrame de prueba con valores nulos en columnas categóricas
    data = [("A", "X", 1), ("B", None, 2), ("C", "Z", 3), ("A", None, 4), ("B", "X", 5), ("C", "Z", 6)]
    columns = ["col1", "col2", "value"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función handle_missing_categorical_columns con el DataFrame de prueba
    result_df = handle_missing_categorical_columns(mock_df, ["col2"])

    # Utiliza afirmaciones (assert) para verificar que los valores nulos han sido reemplazados correctamente
    expected_data = [("A", "X", 1), ("B", "Unknown", 2), ("C", "Z", 3), ("A", "Unknown", 4), ("B", "X", 5), ("C", "Z", 6)]
    expected_columns = ["col1", "col2", "value"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()

# Ejecuta la prueba
test_handle_missing_categorical_columns(spark)

In [None]:
# Define la prueba para impute_abs_negative_values
def test_impute_abs_negative_values(spark_session):
    # Crea un DataFrame de prueba con valores negativos
    data = [("A", -1.0, 1), ("B", 2.0, -2), ("C", 3.0, 3), ("A", -4.0, 4), ("B", 5.0, -5), ("C", 6.0, 6)]
    columns = ["col1", "col2", "value"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función impute_abs_negative_values con el DataFrame de prueba
    result_df = impute_abs_negative_values(mock_df, ["col2", "value"])

    # Utiliza afirmaciones (assert) para verificar que los valores negativos han sido reemplazados correctamente
    expected_data = [("A", 1.0, 1), ("B", 2.0, 2), ("C", 3.0, 3), ("A", 4.0, 4), ("B", 5.0, 5), ("C", 6.0, 6)]
    expected_columns = ["col1", "col2", "value"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()

# Ejecuta la prueba
test_impute_abs_negative_values(spark)


In [None]:
# Define la prueba para string_indexer_transform
def test_string_indexer_transform(spark_session):
    # Crea un DataFrame de prueba con columnas categóricas
    data = [("A", "X", 1), ("B", "Y", 2), ("C", "Z", 3), ("A", "X", 4), ("B", "Y", 5), ("C", "Z", 6)]
    columns = ["col1", "col2", "value"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función string_indexer_transform con el DataFrame de prueba
    result_df = string_indexer_transform(mock_df, ["col1", "col2"])

    # Utiliza afirmaciones (assert) para verificar que las columnas categóricas han sido transformadas correctamente
    expected_data = [(0.0, 0.0, 1), (1.0, 1.0, 2), (2.0, 2.0, 3), (0.0, 0.0, 4), (1.0, 1.0, 5), (2.0, 2.0, 6)]
    expected_columns = ["col1_index", "col2_index", "value"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()


# Ejecuta la prueba
test_string_indexer_transform(spark)

In [None]:
# Define la prueba para inner_join
def test_inner_join(spark_session):
    # Crea dos DataFrames de prueba para realizar el inner join
    data_1 = [("A", 1), ("B", 2), ("C", 3)]
    columns_1 = ["common_column", "value_1"]
    mock_df_1 = spark_session.createDataFrame(data_1, columns_1)

    data_2 = [("A", 10), ("B", 20), ("D", 30)]
    columns_2 = ["common_column", "value_2"]
    mock_df_2 = spark_session.createDataFrame(data_2, columns_2)

    # Llama a la función inner_join con los DataFrames de prueba
    result_df = inner_join(mock_df_1, mock_df_2)

    # Utiliza afirmaciones (assert) para verificar que el inner join se realiza correctamente
    expected_data = [("A", 1, 10), ("B", 2, 20)]
    expected_columns = ["common_column", "value_1", "value_2"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()



# Ejecuta la prueba
test_inner_join(spark)

In [None]:
# Define la prueba para nombre_columnas
def test_nombre_columnas(spark_session):
    # Crea un DataFrame de prueba con columnas que serán renombradas
    data = [("A", 1, 10), ("B", 2, 20), ("C", 3, 30)]
    columns = ["Ind_ID", "CHILDREN", "Mobile_phone"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función nombre_columnas con el DataFrame de prueba
    result_df = nombre_columnas(mock_df)

    # Utiliza afirmaciones (assert) para verificar que las columnas son renombradas correctamente
    expected_data = [("A", 1, 10), ("B", 2, 20), ("C", 3, 30)]
    expected_columns = ["ID", "X1", "X2"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()

# Ejecuta la prueba
test_nombre_columnas(spark)

In [None]:
# Define la prueba para vector_assembler
def test_vector_assembler(spark_session):
    # Crea un DataFrame de prueba con columnas para ensamblar en un vector
    data = [(1, 2, 3, 4, 5), (6, 7, 8, 9, 10), (11, 12, 13, 14, 15)]
    columns = ["feature_1", "feature_2", "feature_3", "feature_4", "feature_5"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función vector_assembler con el DataFrame de prueba
    result_df = vector_assembler(mock_df, input_cols=["feature_1", "feature_2", "feature_3", "feature_4", "feature_5"])

    # Utiliza afirmaciones (assert) para verificar que la columna "features" ha sido creada correctamente
    expected_data = [(1, 2, 3, 4, 5, [1.0, 2.0, 3.0, 4.0, 5.0]),
                     (6, 7, 8, 9, 10, [6.0, 7.0, 8.0, 9.0, 10.0]),
                     (11, 12, 13, 14, 15, [11.0, 12.0, 13.0, 14.0, 15.0])]
    expected_columns = ["feature_1", "feature_2", "feature_3", "feature_4", "feature_5", "features"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()


# Ejecuta la prueba
test_vector_assembler(spark)


In [None]:
# Define la prueba para scaler
def test_scaler(spark_session):
    # Crea un DataFrame de prueba con una columna para escalar
    data = [(1.0, 2.0, 3.0), (4.0, 5.0, 6.0), (7.0, 8.0, 9.0)]
    columns = ["feature_1", "feature_2", "feature_3"]
    mock_df = spark_session.createDataFrame(data, columns)

    # Llama a la función scaler con el DataFrame de prueba
    result_df = scaler(mock_df, input_col="feature_1", output_col="scaled_feature_1")

    # Utiliza afirmaciones (assert) para verificar que la columna "scaled_feature_1" ha sido creada correctamente
    expected_data = [(1.0, 2.0, 3.0, 0.0),
                     (4.0, 5.0, 6.0, 0.5),
                     (7.0, 8.0, 9.0, 1.0)]
    expected_columns = ["feature_1", "feature_2", "feature_3", "scaled_feature_1"]
    expected_result_df = spark_session.createDataFrame(expected_data, expected_columns)

    assert result_df.collect() == expected_result_df.collect()

# Ejecuta la prueba
test_scaler(spark)
