In [12]:
import pandas as pd 
import numpy as np 
import scipy as sp 

import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession

# Importar funciones necesarias
from pyspark.sql.functions import col, regexp_replace, dayofmonth, weekofyear, year, month
from pyspark.sql.functions import col, to_date, sum
from pyspark.sql.functions import col, sum as spark_sum
from pyspark.sql.functions import regexp_replace, col, when
from pyspark.sql.functions import format_number
from pyspark.sql.types import IntegerType,FloatType
# Puedes obtener estadísticas específicas para una columna
from pyspark.sql.functions import rand, mean, min, max, lit
from pyspark.sql.functions import approx_count_distinct
from pyspark.sql.window import Window
from pyspark.sql.functions import log1p
from pyspark.sql import functions as F

from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [2]:

# Inicializar la sesión de Spark como una variable global
global spark
spark = SparkSession.builder.appName("Appetl").getOrCreate()

# Definir variables globales para las rutas de archivos
global accepted_r, rejected_r
accepted_r = 'hdfs://namenode:8020/datasets/raw/accepted_2007_to_2018Q4.csv.gz'
rejected_r = 'hdfs://namenode:8020/datasets/raw/rejected_2007_to_2018Q4.csv.gz'

# Función para cargar los datos
def load_data():
    global accepted_df, rejected_df, accepted_dfm, rejected_dfm
    # Leer los archivos CSV comprimidos usando PySpark
    accepted_df = spark.read.option("header", "true").option("inferSchema", "true").csv(accepted_r)
    rejected_df = spark.read.option("header", "true").option("inferSchema", "true").csv(rejected_r)

    # Haciendo una copia del DataFrame usando alias
    accepted_dfm = accepted_df.alias("accepted_dfm")
    rejected_dfm = rejected_df.alias("rejected_dfm")

# Llamar a la función para cargar los datos
load_data()

# Puedes acceder a accepted_df, rejected_df, accepted_dfm, rejected_dfm desde cualquier parte del notebook.


In [3]:
# Función para limpiar y transformar los datos
def clean_and_transform():
    global rejected_dfm
    # Convertir columnas a tipos de datos adecuados y realizar transformaciones 
    rejected_dfm = rejected_dfm.withColumn("Risk_Score", col("Risk_Score").cast("float"))
    rejected_dfm = rejected_dfm.withColumn("Debt-To-Income Ratio", regexp_replace(col("Debt-To-Income Ratio"), "%", "").cast("float") / 100.0)
    rejected_dfm = rejected_dfm.withColumn("Zip Code", regexp_replace(col("Zip Code"), "[^0-9]", ""))
    rejected_dfm = rejected_dfm.withColumn("Policy Code", col("Policy Code").cast("int"))
    # Convertir "Amount Requested" a tipo fecha
    #rejected_dfm = rejected_dfm.withColumn("Application Date", to_date(col("Application Date"), "MM/dd/yyyy"))
    
    # Crear nuevas columnas para día, semana y mes
    rejected_dfm = rejected_dfm.withColumn('Day', dayofmonth(col('Application Date')))
    rejected_dfm = rejected_dfm.withColumn('Week', weekofyear(col('Application Date')))
    rejected_dfm = rejected_dfm.withColumn('Month', month(col('Application Date')))

In [4]:
clean_and_transform()

In [8]:
rejected_dfm.printSchema()

root
 |-- Amount Requested: date (nullable = true)
 |-- Application Date: date (nullable = true)
 |-- Loan Title: string (nullable = true)
 |-- Risk_Score: float (nullable = true)
 |-- Debt-To-Income Ratio: double (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Employment Length: string (nullable = true)
 |-- Policy Code: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Week: integer (nullable = true)
 |-- Month: integer (nullable = true)



In [5]:
rejected_dfm.show()

+----------------+----------------+--------------------+----------+--------------------+--------+-----+-----------------+-----------+
|Amount Requested|Application Date|          Loan Title|Risk_Score|Debt-To-Income Ratio|Zip Code|State|Employment Length|Policy Code|
+----------------+----------------+--------------------+----------+--------------------+--------+-----+-----------------+-----------+
|          1000.0|      2007-05-26|Wedding Covered b...|     693.0|                 0.1|     481|   NM|          4 years|          0|
|          1000.0|      2007-05-26|  Consolidating Debt|     703.0|                 0.1|     010|   MA|         < 1 year|          0|
|         11000.0|      2007-05-27|Want to consolida...|     715.0|                 0.1|     212|   MD|           1 year|          0|
|          6000.0|      2007-05-27|             waksman|     698.0|  0.3863999938964844|     017|   MA|         < 1 year|          0|
|          1500.0|      2007-05-27|              mdrigo|     5

In [6]:
rejected_dfm.printSchema()

root
 |-- Amount Requested: double (nullable = true)
 |-- Application Date: date (nullable = true)
 |-- Loan Title: string (nullable = true)
 |-- Risk_Score: float (nullable = true)
 |-- Debt-To-Income Ratio: double (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Employment Length: string (nullable = true)
 |-- Policy Code: integer (nullable = true)



In [8]:
# Variables globales
mean_risk_score = None
mode_amount_requested = None

def handle_numeric_variables(df):
    global mean_risk_score
    global mode_amount_requested

    # Calcular la media de Risk_Score
    mean_risk_score = df.select(mean("Risk_Score")).collect()[0][0]

    # Calcular la moda de Amount Requested
    mode_amount_requested = df.groupBy("Amount Requested").count().orderBy(col("count").desc()).select("Amount Requested").first()[0]

    # Reemplazar nulos en Risk_Score por la media
    df = df.withColumn("Risk_Score", when(col("Risk_Score").isNull(), lit(mean_risk_score)).otherwise(col("Risk_Score")))

    # Reemplazar nulos en Amount Requested por la moda
    df = df.withColumn("Amount Requested", when(col("Amount Requested").isNull(), lit(mode_amount_requested)).otherwise(col("Amount Requested")))

    return df

In [9]:
def handle_categorical_variables(df):
    # Reemplazar los nulos en Loan Title con "Sin registro"
    df = df.withColumn("Loan Title", when(col("Loan Title").isNull(), "Sin registro").otherwise(col("Loan Title")))

    # Reemplazar los nulos en Zip Code con 999
    df = df.withColumn("Zip Code", when(col("Zip Code").isNull(), "999").otherwise(col("Zip Code")))

    # Reemplazar los nulos en State con "Sin registro"
    df = df.withColumn("State", when(col("State").isNull(), "Sin registro").otherwise(col("State")))

    # Reemplazar los nulos en Employment Length con "Sin registro"
    df = df.withColumn("Employment Length", when(col("Employment Length").isNull(), "Sin registro").otherwise(col("Employment Length")))

    return df

In [10]:
# Tratar variables numéricas
rejected_dfm = handle_numeric_variables(rejected_dfm)

# Tratar variables categóricas
rejected_dfm = handle_categorical_variables(rejected_dfm)


In [13]:
def split_train_test(df, train_percentage=0.8):
    """
    Divide un DataFrame en conjuntos de entrenamiento y prueba.

    Parámetros:
    - df: DataFrame de PySpark.
    - train_percentage: Porcentaje del DataFrame para el conjunto de entrenamiento.

    Retorna:
    - (train_df, test_df): Tupla de DataFrames para entrenamiento y prueba.
    """

    # Agrega una columna 'split' con valores aleatorios entre 0 y 1
    df_with_split = df.withColumn('split', rand())

    # Divide el DataFrame en train y test según el valor en la columna 'split'
    train_df = df_with_split.filter(col('split') <= train_percentage)
    test_df = df_with_split.filter(col('split') > train_percentage)

    # Elimina la columna 'split' ya que ya no es necesaria
    train_df = train_df.drop('split')
    test_df = test_df.drop('split')

    return train_df, test_df

# Llama a la función con tu DataFrame y porcentaje deseado
train_df, test_df = split_train_test(rejected_dfm, train_percentage=0.8)


In [None]:
# Ejemplo de cómo crear y almacenar un DataFrame en una tabla de Hive con formato Parquet
df.write.mode("overwrite").parquet("/user/hive/warehouse/lending.db/train_data_parquet")
spark.sql("CREATE TABLE IF NOT EXISTS lending.train_data_parquet USING PARQUET LOCATION '/user/hive/warehouse/lending.db/train_data_parquet'")


In [15]:
# Nombre de las tablas en Hive
hive_train_table = "train_data"
hive_test_table = "test_data"

# Crear bases de datos si no existen
spark.sql("CREATE DATABASE IF NOT EXISTS lending")

# Cambiar a la base de datos
spark.sql("USE lending")

# Guardar los DataFrames como tablas en Hive
train_df.write.mode("overwrite").saveAsTable(hive_train_table)
test_df.write.mode("overwrite").saveAsTable(hive_test_table)