In [None]:
pip install pyspark==3.3.2

In [None]:
print('Hello')

# Para esta tarea se utilizó Google Colab, el cual permite ejecutar Apache Spark sin necesidad de una instalación local. PySpark se configura automáticamente dentro del entorno, facilitando el trabajo con grandes volúmenes de datos.

In [1]:
import time
from pyspark.sql import SparkSession

t0 = time.perf_counter()

spark = SparkSession.builder \
    .appName("Mi primer sesion clase datos masivos") \
    .master("local[*]") \
    .getOrCreate()

t1 = time.perf_counter()

print(f"Tiempo: {t1 - t0:.3f} s")


Tiempo: 11.390 s


In [2]:
#spark.stop()
spark._jsc.sc().isStopped() #Revisar si sigue activa la sesión

False

### Se eligió el conjunto de datos Bank Loan Case Study Dataset porque contiene información financiera relevante de clientes bancarios, como ingresos, préstamos, estado laboral y comportamiento crediticio. 

### Este tipo de datos es ideal para practicar análisis de datos, filtrado, agregaciones y estadísticas descriptivas utilizando PySpark. Además, es un dataset realista que puede ser utilizado durante todo el tetramestre para análisis exploratorio y modelos predictivos.

### La tabla application_data contiene información detallada de los clientes y de las solicitudes de crédito, incluyendo variables demográficas, laborales, financieras y del proceso de aprobación. Esta información permite realizar cálculos entre variables, análisis descriptivos y segmentaciones relevantes para el estudio del riesgo crediticio.

In [3]:
t0 = time.perf_counter()

df = spark.read.csv('/Users/Sonia/Desktop/MCD/DATOS MASIVOS/DATOS/application_data.csv', header=True, inferSchema=True)

t1 = time.perf_counter()

print(f"Tiempo: {t1 - t0:.3f} s")
df.show(20)

Tiempo: 19.032 s
+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+-------------------+-------------------+-------------------+--------------+----------------+---------------------------+------------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+----------

In [4]:
df.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: integer (nullable = true)
 |-- OWN_CAR_AG

### Uso de PySpark para filtrar datos, generar estadísticas descriptivas básicas y realizar algunas operaciones aritméticas entre registros y columnas.

In [5]:
df.filter(df.CODE_GENDER.isin("M")).show(10)

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+----------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+-------------------+-------------------+-------------------+--------------+----------------+---------------------------+---------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+-------------------

In [6]:
#Resumen de datos
t0 = time.perf_counter()

df.describe().show()

t1 = time.perf_counter()
print(f"Tiempo: {t1 - t0:.3f} s")

+-------+------------------+-------------------+------------------+-----------+------------+---------------+------------------+------------------+-----------------+------------------+------------------+---------------+----------------+--------------------+------------------+-----------------+--------------------------+-------------------+------------------+------------------+-------------------+------------------+--------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+------------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------+---------------------------+-------------------+-----

In [None]:
pip install pandas==1.3.5

In [8]:
import pandas as pd
print(pd.__version__)




1.3.5


In [9]:
from pyspark.sql import functions as F


t0 = time.perf_counter()

res_spark = (
    df
    .groupBy("CODE_GENDER")
    .agg(
        F.sum("AMT_INCOME_TOTAL").alias("sum_AMT_INCOME_TOTAL"),
        F.avg("AMT_INCOME_TOTAL").alias("mean_AMT_INCOME_TOTAL"),
        F.max("AMT_INCOME_TOTAL").alias("max_AMT_INCOME_TOTAL"),
        F.count(F.lit(1)).alias("n")
    )
    .orderBy(F.col("sum_AMT_INCOME_TOTAL").desc())
)

top_spark = res_spark.toPandas()

t1 = time.perf_counter()

display(top_spark)
print(f"PySpark tiempo: {t1 - t0:.3f} s")

Unnamed: 0,CODE_GENDER,sum_AMT_INCOME_TOTAL,mean_AMT_INCOME_TOTAL,max_AMT_INCOME_TOTAL,n
0,F,31588430000.0,156032.309247,117000000.0,202448
1,M,20318040000.0,193396.482153,18000090.0,105059
2,XNA,747000.0,186750.0,247500.0,4


PySpark tiempo: 5.831 s


In [10]:
from pyspark.sql import functions as F

df = df.withColumn(
    "AGE_YEARS",
    (-F.col("DAYS_BIRTH") / 365.25).cast("int")
)

df.select("DAYS_BIRTH", "AGE_YEARS").show(5)


+----------+---------+
|DAYS_BIRTH|AGE_YEARS|
+----------+---------+
|     -9461|       25|
|    -16765|       45|
|    -19046|       52|
|    -19005|       52|
|    -19932|       54|
+----------+---------+
only showing top 5 rows

